From 1ddd598d317e585e41aba12cec96e49181b9bdb4 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Fri, 12 Nov 2021 11:06:40 -0500 Subject: [PATCH] Implement controller and kubelet changes for recovery from resize failures --- .../resourcequota/resource_quota_monitor.go | 6 +- .../volume/expand/expand_controller.go | 46 +- pkg/volume/csi/csi_client.go | 4 + pkg/volume/testing/testing.go | 9 +- .../util/operationexecutor/fakegenerator.go | 4 + .../operation_executor_test.go | 10 + .../operationexecutor/operation_generator.go | 486 ++++++++++++++++-- .../operation_generator_test.go | 272 +++++++++- pkg/volume/util/resize_util.go | 129 ++++- pkg/volume/util/resize_util_test.go | 2 +- 10 files changed, 877 insertions(+), 91 deletions(-) diff --git a/pkg/controller/resourcequota/resource_quota_monitor.go b/pkg/controller/resourcequota/resource_quota_monitor.go index 1c5c15aab02..c42940b1c88 100644 --- a/pkg/controller/resourcequota/resource_quota_monitor.go +++ b/pkg/controller/resourcequota/resource_quota_monitor.go @@ -23,7 +23,7 @@ import ( "k8s.io/klog/v2" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -148,6 +148,10 @@ func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cac oldService := oldObj.(*v1.Service) newService := newObj.(*v1.Service) notifyUpdate = core.GetQuotaServiceType(oldService) != core.GetQuotaServiceType(newService) + case schema.GroupResource{Resource: "persistentvolumeclaims"}: + oldPVC := oldObj.(*v1.PersistentVolumeClaim) + newPVC := newObj.(*v1.PersistentVolumeClaim) + notifyUpdate = core.RequiresQuotaReplenish(newPVC, oldPVC) } if notifyUpdate { event := &event{ diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index b99e8b93d75..1d9bbde29f1 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -44,12 +45,14 @@ import ( "k8s.io/client-go/util/workqueue" cloudprovider "k8s.io/cloud-provider" "k8s.io/kubernetes/pkg/controller/volume/events" + "k8s.io/kubernetes/pkg/features" proxyutil "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csimigration" "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" "k8s.io/kubernetes/pkg/volume/util/subpath" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" "k8s.io/kubernetes/pkg/volume/util/volumepathhandler" ) @@ -302,19 +305,31 @@ func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.Persi return util.DeleteAnnPreResizeCapacity(pv, expc.GetKubeClient()) } - pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient) - if err != nil { - klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) - return err + var generatedOptions volumetypes.GeneratedOperations + var err error + + if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) { + generatedOptions, err = expc.operationGenerator.GenerateExpandAndRecoverVolumeFunc(pvc, pv, resizerName) + if err != nil { + klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) + return err + } + } else { + pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient) + if err != nil { + klog.Errorf("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) + return err + } + + generatedOptions, err = expc.operationGenerator.GenerateExpandVolumeFunc(pvc, pv) + if err != nil { + klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) + return err + } } - generatedOperations, err := expc.operationGenerator.GenerateExpandVolumeFunc(pvc, pv) - if err != nil { - klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) - return err - } klog.V(5).Infof("Starting ExpandVolume for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) - _, detailedErr := generatedOperations.Run() + _, detailedErr := generatedOptions.Run() return detailedErr } @@ -357,8 +372,15 @@ func (expc *expandController) getPersistentVolume(ctx context.Context, pvc *v1.P // isNodeExpandComplete returns true if pvc.Status.Capacity >= pv.Spec.Capacity func (expc *expandController) isNodeExpandComplete(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool { klog.V(4).Infof("pv %q capacity = %v, pvc %s capacity = %v", pv.Name, pv.Spec.Capacity[v1.ResourceStorage], pvc.ObjectMeta.Name, pvc.Status.Capacity[v1.ResourceStorage]) - pvcCap, pvCap := pvc.Status.Capacity[v1.ResourceStorage], pv.Spec.Capacity[v1.ResourceStorage] - return pvcCap.Cmp(pvCap) >= 0 + pvcSpecCap := pvc.Spec.Resources.Requests.Storage() + pvcStatusCap, pvCap := pvc.Status.Capacity[v1.ResourceStorage], pv.Spec.Capacity[v1.ResourceStorage] + + // since we allow shrinking volumes, we must compare both pvc status and capacity + // with pv spec capacity. + if pvcStatusCap.Cmp(*pvcSpecCap) >= 0 && pvcStatusCap.Cmp(pvCap) >= 0 { + return true + } + return false } // Implementing VolumeHost interface diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index cf4ba7a0d48..b5e0ff718d4 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -347,8 +347,12 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp resp, err := nodeClient.NodeExpandVolume(ctx, req) if err != nil { + if !isFinalError(err) { + return opts.newSize, volumetypes.NewUncertainProgressError(err.Error()) + } return opts.newSize, err } + updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI) return *updatedQuantity, nil } diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 55352fa9c74..b70d23ac6dd 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -85,6 +85,8 @@ const ( FailVolumeExpansion = "fail-expansion-test" + AlwaysFailNodeExpansion = "always-fail-node-expansion" + deviceNotMounted = "deviceNotMounted" deviceMountUncertain = "deviceMountUncertain" deviceMounted = "deviceMounted" @@ -178,6 +180,7 @@ type FakeVolumePlugin struct { LimitKey string ProvisionDelaySeconds int SupportsRemount bool + DisableNodeExpansion bool // default to false which means it is attachable by default NonAttachable bool @@ -464,13 +467,17 @@ func (plugin *FakeVolumePlugin) ExpandVolumeDevice(spec *Spec, newSize resource. } func (plugin *FakeVolumePlugin) RequiresFSResize() bool { - return true + return !plugin.DisableNodeExpansion } func (plugin *FakeVolumePlugin) NodeExpand(resizeOptions NodeResizeOptions) (bool, error) { if resizeOptions.VolumeSpec.Name() == FailWithInUseVolumeName { return false, volumetypes.NewFailedPreconditionError("volume-in-use") } + if resizeOptions.VolumeSpec.Name() == AlwaysFailNodeExpansion { + return false, fmt.Errorf("Test failure: NodeExpand") + } + // Set up fakeVolumePlugin not support STAGE_UNSTAGE for testing the behavior // so as volume can be node published before we can resize if resizeOptions.CSIVolumePhase == volume.CSIVolumeStaged { diff --git a/pkg/volume/util/operationexecutor/fakegenerator.go b/pkg/volume/util/operationexecutor/fakegenerator.go index 5513dd60db9..6d8e5a1b3d1 100644 --- a/pkg/volume/util/operationexecutor/fakegenerator.go +++ b/pkg/volume/util/operationexecutor/fakegenerator.go @@ -104,6 +104,10 @@ func (f *fakeOGCounter) GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1. return f.recordFuncCall("GenerateExpandVolumeFunc"), nil } +func (f *fakeOGCounter) GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume, string) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateExpandVolumeFunc"), nil +} + func (f *fakeOGCounter) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateExpandInUseVolumeFunc"), nil } diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index 0c33d347eb9..521cdf99ac0 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -658,6 +658,16 @@ func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvc *v1.PersistentV }, nil } +func (fopg *fakeOperationGenerator) GenerateExpandAndRecoverVolumeFunc(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) (volumetypes.GeneratedOperations, error) { + opFunc := func() volumetypes.OperationContext { + startOperationAndBlock(fopg.ch, fopg.quit) + return volumetypes.NewOperationContext(nil, nil, false) + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil +} + func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 4eabca48927..d7cb5121a6c 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -25,6 +25,8 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/api/resource" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -150,10 +152,50 @@ type OperationGenerator interface { GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) + GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume, string) (volumetypes.GeneratedOperations, error) + // Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume. GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) } +type inTreeResizeOpts struct { + resizerName string + pvc *v1.PersistentVolumeClaim + pv *v1.PersistentVolume + volumeSpec *volume.Spec + volumePlugin volume.ExpandableVolumePlugin +} + +type inTreeResizeResponse struct { + pvc *v1.PersistentVolumeClaim + pv *v1.PersistentVolume + err error + + // Indicates whether kubelet should assume resize operation as finished. + // For kubelet - resize operation could be assumed as finished even if + // actual resizing is *not* finished. This can happen, because certain prechecks + // are failing and kubelet should not retry expansion, or it could happen + // because resize operation is genuinely finished. + assumeResizeOpAsFinished bool + + // indicates that resize operation was called on underlying volume driver + // mainly useful for testing. + resizeCalled bool + + // indicates whether entire volume expansion is finished or not + // only used from nodeExpansion calls. Mainly used for testing. + resizeFinished bool +} + +type nodeResizeOperationOpts struct { + vmt VolumeToMount + pvc *v1.PersistentVolumeClaim + pv *v1.PersistentVolume + pluginResizeOpts volume.NodeResizeOptions + volumePlugin volume.NodeExpandableVolumePlugin + actualStateOfWorld ActualStateOfWorldMounterUpdater +} + func (og *operationGenerator) GenerateVolumesAreAttachedFunc( attachedVolumes []AttachedVolume, nodeName types.NodeName, @@ -1595,7 +1637,6 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( } expandVolumeFunc := func() volumetypes.OperationContext { - migrated := false newSize := pvc.Spec.Resources.Requests[v1.ResourceStorage] @@ -1617,7 +1658,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( // k8s doesn't have transactions, we can't guarantee that after updating PV - updating PVC will be // successful, that is why all PVCs for which pvc.Spec.Size > pvc.Status.Size must be reprocessed // until they reflect user requested size in pvc.Status.Size - updateErr := util.UpdatePVSize(pv, newSize, og.kubeClient) + _, updateErr := util.UpdatePVSize(pv, newSize, og.kubeClient) if updateErr != nil { detailedErr := fmt.Errorf("error updating PV spec capacity for volume %q with : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr) return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated) @@ -1632,7 +1673,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( // reflects user requested size. if !volumePlugin.RequiresFSResize() || !fsVolume { klog.V(4).Infof("Controller resizing done for PVC %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) - err := util.MarkResizeFinished(pvc, newSize, og.kubeClient) + _, err := util.MarkResizeFinished(pvc, newSize, og.kubeClient) if err != nil { detailedErr := fmt.Errorf("error marking pvc %s as resized : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated) @@ -1640,7 +1681,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg) } else { - err := util.MarkForFSResize(pvc, og.kubeClient) + _, err := util.MarkForFSResize(pvc, og.kubeClient) if err != nil { detailedErr := fmt.Errorf("error updating pvc %s condition for fs resize : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) klog.Warning(detailedErr) @@ -1672,6 +1713,210 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( }, nil } +func (og *operationGenerator) GenerateExpandAndRecoverVolumeFunc( + pvc *v1.PersistentVolumeClaim, + pv *v1.PersistentVolume, resizerName string) (volumetypes.GeneratedOperations, error) { + + volumeSpec := volume.NewSpecFromPersistentVolume(pv, false) + + volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec) + if err != nil { + return volumetypes.GeneratedOperations{}, fmt.Errorf("error finding plugin for expanding volume: %q with error %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) + } + + if volumePlugin == nil { + return volumetypes.GeneratedOperations{}, fmt.Errorf("can not find plugin for expanding volume: %q", util.GetPersistentVolumeClaimQualifiedName(pvc)) + } + + expandVolumeFunc := func() volumetypes.OperationContext { + resizeOpts := inTreeResizeOpts{ + pvc: pvc, + pv: pv, + resizerName: resizerName, + volumePlugin: volumePlugin, + volumeSpec: volumeSpec, + } + migrated := false + resp := og.expandAndRecoverFunction(resizeOpts) + if resp.err != nil { + return volumetypes.NewOperationContext(resp.err, resp.err, migrated) + } + return volumetypes.NewOperationContext(nil, nil, migrated) + } + + eventRecorderFunc := func(err *error) { + if *err != nil { + og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error()) + } + } + + return volumetypes.GeneratedOperations{ + OperationName: "expand_volume", + OperationFunc: expandVolumeFunc, + EventRecorderFunc: eventRecorderFunc, + CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeSpec), "expand_volume"), + }, nil +} + +func (og *operationGenerator) expandAndRecoverFunction(resizeOpts inTreeResizeOpts) inTreeResizeResponse { + pvc := resizeOpts.pvc + pv := resizeOpts.pv + resizerName := resizeOpts.resizerName + volumePlugin := resizeOpts.volumePlugin + volumeSpec := resizeOpts.volumeSpec + + pvcSpecSize := pvc.Spec.Resources.Requests[v1.ResourceStorage] + pvcStatusSize := pvc.Status.Capacity[v1.ResourceStorage] + pvSize := pv.Spec.Capacity[v1.ResourceStorage] + + resizeResponse := inTreeResizeResponse{ + pvc: pvc, + pv: pv, + resizeCalled: false, + } + + // by default we are expanding to full-fill size requested in pvc.Spec.Resources + newSize := pvcSpecSize + resizeStatus := v1.PersistentVolumeClaimNoExpansionInProgress + if pvc.Status.ResizeStatus != nil { + resizeStatus = *pvc.Status.ResizeStatus + } + var allocatedSize *resource.Quantity + t, ok := pvc.Status.AllocatedResources[v1.ResourceStorage] + if ok { + allocatedSize = &t + } + var err error + + if pvSize.Cmp(pvcSpecSize) < 0 { + // pv is not of requested size yet and hence will require expanding + + switch resizeStatus { + case v1.PersistentVolumeClaimControllerExpansionInProgress: + case v1.PersistentVolumeClaimNodeExpansionPending: + case v1.PersistentVolumeClaimNodeExpansionInProgress: + case v1.PersistentVolumeClaimNodeExpansionFailed: + if allocatedSize != nil { + newSize = *allocatedSize + } + default: + newSize = pvcSpecSize + } + } else { + // PV has already been expanded and hence we can be here for following reasons: + // 1. If expansion is pending on the node and this was just a spurious update event + // we don't need to do anything and let kubelet handle it. + // 2. It could be that - although we successfully expanded the volume, we failed to + // record our work in API objects, in which case - we should resume resizing operation + // and let API objects be updated. + // 3. Controller successfully expanded the volume, but expansion is failing on the node + // and before kubelet can retry failed node expansion - controller must verify if it is + // safe to do so. + // 4. While expansion was still pending on the node, user reduced the pvc size. + switch resizeStatus { + case v1.PersistentVolumeClaimNodeExpansionInProgress: + case v1.PersistentVolumeClaimNodeExpansionPending: + // we don't need to do any work. We could be here because of a spurious update event. + // This is case #1 + return resizeResponse + case v1.PersistentVolumeClaimNodeExpansionFailed: + // This is case#3 + pvc, err = og.markForPendingNodeExpansion(pvc, pv) + resizeResponse.pvc = pvc + resizeResponse.err = err + return resizeResponse + case v1.PersistentVolumeClaimControllerExpansionInProgress: + case v1.PersistentVolumeClaimControllerExpansionFailed: + case v1.PersistentVolumeClaimNoExpansionInProgress: + // This is case#2 or it could also be case#4 when user manually shrunk the PVC + // after expanding it. + if allocatedSize != nil { + newSize = *allocatedSize + } + default: + // It is impossible for ResizeStatus to be nil and allocatedSize to be not nil but somehow + // if we do end up in this state, it is safest to resume expansion to last recorded size in + // allocatedSize variable. + if pvc.Status.ResizeStatus == nil && allocatedSize != nil { + newSize = *allocatedSize + } else { + newSize = pvcSpecSize + } + } + } + + pvc, err = util.MarkControllerReisizeInProgress(pvc, resizerName, newSize, og.kubeClient) + if err != nil { + msg := fmt.Errorf("error updating pvc %s with resize in progress: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) + resizeResponse.err = msg + resizeResponse.pvc = pvc + return resizeResponse + } + + updatedSize, err := volumePlugin.ExpandVolumeDevice(volumeSpec, newSize, pvcStatusSize) + resizeResponse.resizeCalled = true + + if err != nil { + msg := fmt.Errorf("error expanding pvc %s: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) + resizeResponse.err = msg + resizeResponse.pvc = pvc + return resizeResponse + } + + // update PV size + var updateErr error + pv, updateErr = util.UpdatePVSize(pv, updatedSize, og.kubeClient) + // if updating PV failed, we are going to leave the PVC in ControllerExpansionInProgress state, so as expansion can be retried to previously set allocatedSize value. + if updateErr != nil { + msg := fmt.Errorf("error updating pv for pvc %s: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr) + resizeResponse.err = msg + return resizeResponse + } + resizeResponse.pv = pv + + fsVolume, _ := util.CheckVolumeModeFilesystem(volumeSpec) + + if !volumePlugin.RequiresFSResize() || !fsVolume { + pvc, err = util.MarkResizeFinished(pvc, updatedSize, og.kubeClient) + if err != nil { + msg := fmt.Errorf("error marking pvc %s as resized: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) + resizeResponse.err = msg + return resizeResponse + } + resizeResponse.pvc = pvc + successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) + og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg) + } else { + pvc, err = og.markForPendingNodeExpansion(pvc, pv) + resizeResponse.pvc = pvc + if err != nil { + msg := fmt.Errorf("error marking pvc %s for node expansion: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) + resizeResponse.err = msg + return resizeResponse + } + } + return resizeResponse +} + +func (og *operationGenerator) markForPendingNodeExpansion(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (*v1.PersistentVolumeClaim, error) { + var err error + pvc, err = util.MarkForFSResize(pvc, og.kubeClient) + if err != nil { + msg := fmt.Errorf("error marking pvc %s for node expansion: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) + return pvc, msg + } + // store old PVC capacity in pv, so as if PVC gets deleted while node expansion was pending + // we can restore size of pvc from PV annotation and still perform expansion on the node + oldCapacity := pvc.Status.Capacity[v1.ResourceStorage] + err = util.AddAnnPreResizeCapacity(pv, oldCapacity, og.kubeClient) + if err != nil { + detailedErr := fmt.Errorf("error updating pv %s annotation (%s) with pre-resize capacity %s: %v", pv.ObjectMeta.Name, util.AnnPreResizeCapacity, oldCapacity.String(), err) + klog.Warning(detailedErr) + return pvc, detailedErr + } + return pvc, nil +} + func (og *operationGenerator) GenerateExpandInUseVolumeFunc( volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { @@ -1825,6 +2070,7 @@ func (og *operationGenerator) nodeExpandVolume( if expandableVolumePlugin != nil && expandableVolumePlugin.RequiresFSResize() && volumeToMount.VolumeSpec.PersistentVolume != nil { + pv := volumeToMount.VolumeSpec.PersistentVolume pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{}) if err != nil { @@ -1832,56 +2078,200 @@ func (og *operationGenerator) nodeExpandVolume( return false, fmt.Errorf("mountVolume.NodeExpandVolume get PVC failed : %v", err) } - pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] - pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] - if pvcStatusCap.Cmp(pvSpecCap) < 0 { - // File system resize was requested, proceed - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod)) - - if volumeToMount.VolumeSpec.ReadOnly { - simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system") - klog.Warningf(detailedMsg) - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) - og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) - return true, nil - } - rsOpts.VolumeSpec = volumeToMount.VolumeSpec - rsOpts.NewSize = pvSpecCap - rsOpts.OldSize = pvcStatusCap - resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts) - if resizeErr != nil { - // if driver returned FailedPrecondition error that means - // volume expansion should not be retried on this node but - // expansion operation should not block mounting - if volumetypes.IsFailedPreconditionError(resizeErr) { - actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName) - klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error()) - return true, nil - } - return false, resizeErr - } - // Volume resizing is not done but it did not error out. This could happen if a CSI volume - // does not have node stage_unstage capability but was asked to resize the volume before - // node publish. In which case - we must retry resizing after node publish. - if !resizeDone { - return false, nil - } - simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "") - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) - og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) - klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod)) - // File system resize succeeded, now update the PVC's Capacity to match the PV's - err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient) - if err != nil { - // On retry, NodeExpandVolume will be called again but do nothing - return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err) - } + if volumeToMount.VolumeSpec.ReadOnly { + simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system") + klog.Warningf(detailedMsg) + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) + og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) return true, nil } + resizeOp := nodeResizeOperationOpts{ + vmt: volumeToMount, + pvc: pvc, + pv: pv, + pluginResizeOpts: rsOpts, + volumePlugin: expandableVolumePlugin, + actualStateOfWorld: actualStateOfWorld, + } + if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) { + resizeResponse := og.callNodeExpandOnPlugin(resizeOp) + return resizeResponse.assumeResizeOpAsFinished, resizeResponse.err + } else { + return og.legacyCallNodeExpandOnPlugin(resizeOp) + } } return true, nil } +// callNodeExpandOnPlugin is newer version of calling node expansion on plugins, which does support +// recovery from volume expansion failure. +func (og *operationGenerator) callNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) inTreeResizeResponse { + pvc := resizeOp.pvc + pv := resizeOp.pv + volumeToMount := resizeOp.vmt + rsOpts := resizeOp.pluginResizeOpts + actualStateOfWorld := resizeOp.actualStateOfWorld + expandableVolumePlugin := resizeOp.volumePlugin + + var err error + pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] + pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] + + resizeResponse := inTreeResizeResponse{ + pvc: pvc, + pv: pv, + } + + if permitNodeExpansion(pvc, pv) { + // File system resize was requested, proceed + klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod)) + + rsOpts.VolumeSpec = volumeToMount.VolumeSpec + rsOpts.NewSize = pvSpecCap + rsOpts.OldSize = pvcStatusCap + pvc, err = util.MarkNodeExpansionInProgress(pvc, og.kubeClient) + + if err != nil { + msg := volumeToMount.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err) + klog.Errorf(msg.Error()) + resizeResponse.err = msg + return resizeResponse + } + + resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts) + resizeResponse.resizeCalled = true + + if resizeErr != nil { + if volumetypes.IsOperationFinishedError(resizeErr) { + var markFailedError error + pvc, markFailedError = util.MarkNodeExpansionFailed(pvc, og.kubeClient) + // update the pvc with node expansion object + resizeResponse.pvc = pvc + resizeResponse.assumeResizeOpAsFinished = true + if markFailedError != nil { + klog.Errorf(volumeToMount.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error()) + } + } + + // if driver returned FailedPrecondition error that means + // volume expansion should not be retried on this node but + // expansion operation should not block mounting + if volumetypes.IsFailedPreconditionError(resizeErr) { + actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName) + klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error()) + resizeResponse.assumeResizeOpAsFinished = true + return resizeResponse + } + + resizeResponse.err = resizeErr + return resizeResponse + } + resizeResponse.resizeFinished = resizeDone + + // Volume resizing is not done but it did not error out. This could happen if a CSI volume + // does not have node stage_unstage capability but was asked to resize the volume before + // node publish. In which case - we must retry resizing after node publish. + if !resizeDone { + return resizeResponse + } + + simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "") + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) + og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) + klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod)) + + // File system resize succeeded, now update the PVC's Capacity to match the PV's + pvc, err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient) + resizeResponse.pvc = pvc + + if err != nil { + resizeResponse.err = fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err) + // On retry, NodeExpandVolume will be called again but do nothing + return resizeResponse + } + resizeResponse.assumeResizeOpAsFinished = true + return resizeResponse + } + // somehow a resize operation was queued, but we can not perform any resizing because + // prechecks required for node expansion failed. Kubelet should not retry expanding the volume. + resizeResponse.assumeResizeOpAsFinished = true + return resizeResponse +} + +// legacyCallNodeExpandOnPlugin is old version of calling node expansion on plugin, which does not support +// recovery from volume expansion failure +func (og *operationGenerator) legacyCallNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) (bool, error) { + pvc := resizeOp.pvc + pv := resizeOp.pv + volumeToMount := resizeOp.vmt + rsOpts := resizeOp.pluginResizeOpts + actualStateOfWorld := resizeOp.actualStateOfWorld + expandableVolumePlugin := resizeOp.volumePlugin + + var err error + + pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] + pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] + if pvcStatusCap.Cmp(pvSpecCap) < 0 { + // File system resize was requested, proceed + klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod)) + + rsOpts.VolumeSpec = volumeToMount.VolumeSpec + rsOpts.NewSize = pvSpecCap + rsOpts.OldSize = pvcStatusCap + resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts) + if resizeErr != nil { + // if driver returned FailedPrecondition error that means + // volume expansion should not be retried on this node but + // expansion operation should not block mounting + if volumetypes.IsFailedPreconditionError(resizeErr) { + actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName) + klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error()) + return true, nil + } + return false, resizeErr + } + // Volume resizing is not done but it did not error out. This could happen if a CSI volume + // does not have node stage_unstage capability but was asked to resize the volume before + // node publish. In which case - we must retry resizing after node publish. + if !resizeDone { + return false, nil + } + simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "") + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) + og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) + klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod)) + // File system resize succeeded, now update the PVC's Capacity to match the PV's + _, err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient) + if err != nil { + // On retry, NodeExpandVolume will be called again but do nothing + return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err) + } + return true, nil + } + return true, nil +} + +func permitNodeExpansion(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool { + pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] + pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] + // if pvc.Status.Cap is >= pv.Spec.Cap then volume is already expanded + if pvcStatusCap.Cmp(pvSpecCap) >= 0 { + return false + } + + resizeStatus := pvc.Status.ResizeStatus + // if resizestatus is nil or NodeExpansionInProgress or NodeExpansionPending then we should allow volume expansion on + // the node to proceed. We are making an exception for resizeStatus being nil because it will support use cases where + // resizeStatus may not be set (old control-plane expansion controller etc). + if resizeStatus == nil || *resizeStatus == v1.PersistentVolumeClaimNodeExpansionPending || *resizeStatus == v1.PersistentVolumeClaimNodeExpansionInProgress { + return true + } else { + klog.Infof("volume %s/%s can not be expanded because resizeStaus is: %s", pvc.Namespace, pvc.Name, *resizeStatus) + return false + } +} + func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error { mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec) diff --git a/pkg/volume/util/operationexecutor/operation_generator_test.go b/pkg/volume/util/operationexecutor/operation_generator_test.go index 3353510a69f..c7888ef0901 100644 --- a/pkg/volume/util/operationexecutor/operation_generator_test.go +++ b/pkg/volume/util/operationexecutor/operation_generator_test.go @@ -17,6 +17,12 @@ limitations under the License. package operationexecutor import ( + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/tools/record" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" "os" "testing" @@ -27,7 +33,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" fakeclient "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/csi-translation-lib/plugins" "k8s.io/kubernetes/pkg/volume" @@ -111,6 +116,267 @@ func TestOperationGenerator_GenerateUnmapVolumeFunc_PluginName(t *testing.T) { } } +func TestOperationGenerator_GenerateExpandAndRecoverVolumeFunc(t *testing.T) { + var tests = []struct { + name string + pvc *v1.PersistentVolumeClaim + pv *v1.PersistentVolume + recoverFeatureGate bool + disableNodeExpansion bool + // expectations of test + expectedResizeStatus v1.PersistentVolumeClaimResizeStatus + expectedAllocatedSize resource.Quantity + expectResizeCall bool + }{ + { + name: "pvc.spec.size > pv.spec.size, recover_expansion=on", + pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNoExpansionInProgress), + pv: getTestPV("test-vol0", "1G"), + recoverFeatureGate: true, + expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionPending, + expectedAllocatedSize: resource.MustParse("2G"), + expectResizeCall: true, + }, + { + name: "pvc.spec.size = pv.spec.size, recover_expansion=on", + pvc: getTestPVC("test-vol0", "1G", "1G", "", v1.PersistentVolumeClaimNoExpansionInProgress), + pv: getTestPV("test-vol0", "1G"), + recoverFeatureGate: true, + expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionPending, + expectedAllocatedSize: resource.MustParse("1G"), + expectResizeCall: true, + }, + { + name: "pvc.spec.size = pv.spec.size, recover_expansion=on", + pvc: getTestPVC("test-vol0", "1G", "1G", "1G", v1.PersistentVolumeClaimNodeExpansionPending), + pv: getTestPV("test-vol0", "1G"), + recoverFeatureGate: true, + expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionPending, + expectedAllocatedSize: resource.MustParse("1G"), + expectResizeCall: false, + }, + { + name: "pvc.spec.size > pv.spec.size, recover_expansion=on, disable_node_expansion=true", + pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNoExpansionInProgress), + pv: getTestPV("test-vol0", "1G"), + disableNodeExpansion: true, + recoverFeatureGate: true, + expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress, + expectedAllocatedSize: resource.MustParse("2G"), + expectResizeCall: true, + }, + { + name: "pv.spec.size >= pvc.spec.size, recover_expansion=on, resize_status=node_expansion_failed", + pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionFailed), + pv: getTestPV("test-vol0", "2G"), + recoverFeatureGate: true, + expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionPending, + expectedAllocatedSize: resource.MustParse("2G"), + expectResizeCall: false, + }, + } + for i := range tests { + test := tests[i] + t.Run(test.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, test.recoverFeatureGate)() + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + fakePlugin.DisableNodeExpansion = test.disableNodeExpansion + pvc := test.pvc + pv := test.pv + og := getTestOperationGenerator(volumePluginMgr, pvc, pv) + rsOpts := inTreeResizeOpts{ + pvc: pvc, + pv: pv, + resizerName: fakePlugin.GetPluginName(), + volumePlugin: fakePlugin, + } + ogInstance, _ := og.(*operationGenerator) + + expansionResponse := ogInstance.expandAndRecoverFunction(rsOpts) + if expansionResponse.err != nil { + t.Fatalf("GenerateExpandAndRecoverVolumeFunc failed: %v", expansionResponse.err) + } + updatedPVC := expansionResponse.pvc + assert.Equal(t, *updatedPVC.Status.ResizeStatus, test.expectedResizeStatus) + actualAllocatedSize := updatedPVC.Status.AllocatedResources.Storage() + if test.expectedAllocatedSize.Cmp(*actualAllocatedSize) != 0 { + t.Fatalf("GenerateExpandAndRecoverVolumeFunc failed: expected allocated size %s, got %s", test.expectedAllocatedSize.String(), actualAllocatedSize.String()) + } + if test.expectResizeCall != expansionResponse.resizeCalled { + t.Fatalf("GenerateExpandAndRecoverVolumeFunc failed: expected resize called %t, got %t", test.expectResizeCall, expansionResponse.resizeCalled) + } + }) + } +} + +func TestOperationGenerator_callNodeExpansionOnPlugin(t *testing.T) { + var tests = []struct { + name string + pvc *v1.PersistentVolumeClaim + pv *v1.PersistentVolume + recoverFeatureGate bool + + // expectations of test + expectedResizeStatus v1.PersistentVolumeClaimResizeStatus + expectedStatusSize resource.Quantity + expectResizeCall bool + assumeResizeOpAsFinished bool + expectError bool + }{ + { + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_failed", + pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNodeExpansionFailed), + pv: getTestPV("test-vol0", "2G"), + recoverFeatureGate: true, + + expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed, + expectResizeCall: false, + assumeResizeOpAsFinished: true, + expectedStatusSize: resource.MustParse("1G"), + }, + { + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending", + pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending), + pv: getTestPV("test-vol0", "2G"), + recoverFeatureGate: true, + expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress, + expectResizeCall: true, + assumeResizeOpAsFinished: true, + expectedStatusSize: resource.MustParse("2G"), + }, + { + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing", + pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending), + pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"), + recoverFeatureGate: true, + expectError: true, + expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed, + expectResizeCall: true, + assumeResizeOpAsFinished: true, + expectedStatusSize: resource.MustParse("1G"), + }, + } + + for i := range tests { + test := tests[i] + t.Run(test.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, test.recoverFeatureGate)() + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + + pvc := test.pvc + pv := test.pv + pod := getTestPod("test-pod", pvc.Name) + og := getTestOperationGenerator(volumePluginMgr, pvc, pv) + + vmt := VolumeToMount{ + Pod: pod, + VolumeName: v1.UniqueVolumeName(pv.Name), + VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false), + } + resizeOp := nodeResizeOperationOpts{ + pvc: pvc, + pv: pv, + volumePlugin: fakePlugin, + vmt: vmt, + actualStateOfWorld: nil, + } + ogInstance, _ := og.(*operationGenerator) + expansionResponse := ogInstance.callNodeExpandOnPlugin(resizeOp) + + pvc = expansionResponse.pvc + pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] + + if !test.expectError && expansionResponse.err != nil { + t.Errorf("For test %s, expected no error got: %v", test.name, expansionResponse.err) + } + if test.expectError && expansionResponse.err == nil { + t.Errorf("For test %s, expected error but got none", test.name) + } + + if test.expectResizeCall != expansionResponse.resizeCalled { + t.Errorf("For test %s, expected resize called %t, got %t", test.name, test.expectResizeCall, expansionResponse.resizeCalled) + } + if test.assumeResizeOpAsFinished != expansionResponse.assumeResizeOpAsFinished { + t.Errorf("For test %s, expected assumeResizeOpAsFinished %t, got %t", test.name, test.assumeResizeOpAsFinished, expansionResponse.assumeResizeOpAsFinished) + } + if test.expectedResizeStatus != *pvc.Status.ResizeStatus { + t.Errorf("For test %s, expected resizeStatus %v, got %v", test.name, test.expectedResizeStatus, *pvc.Status.ResizeStatus) + } + if pvcStatusCap.Cmp(test.expectedStatusSize) != 0 { + t.Errorf("For test %s, expected status size %s, got %s", test.name, test.expectedStatusSize.String(), pvcStatusCap.String()) + } + }) + } +} + +func getTestPod(podName, pvcName string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + UID: "test-pod-uid", + Namespace: "ns", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: pvcName, + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvcName, + }, + }, + }, + }, + }, + } +} + +func getTestPVC(volumeName string, specSize, statusSize, allocatedSize string, resizeStatus v1.PersistentVolumeClaimResizeStatus) *v1.PersistentVolumeClaim { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "claim01", + Namespace: "ns", + UID: "test-uid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + Resources: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse(specSize)}}, + VolumeName: volumeName, + }, + Status: v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimBound, + }, + } + if len(statusSize) > 0 { + pvc.Status.Capacity = v1.ResourceList{v1.ResourceStorage: resource.MustParse(statusSize)} + } + if len(allocatedSize) > 0 { + pvc.Status.AllocatedResources = v1.ResourceList{v1.ResourceStorage: resource.MustParse(allocatedSize)} + } + if len(resizeStatus) > 0 { + pvc.Status.ResizeStatus = &resizeStatus + } + return pvc +} + +func getTestPV(volumeName string, specSize string) *v1.PersistentVolume { + return &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: volumeName, + UID: "test-uid", + }, + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + Capacity: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse(specSize), + }, + }, + Status: v1.PersistentVolumeStatus{ + Phase: v1.VolumeBound, + }, + } +} + func findMetricWithNameAndLabels(metricFamilyName string, labelFilter map[string]string) *io_prometheus_client.Metric { metricFamily := getMetricFamily(metricFamilyName) if metricFamily == nil { @@ -145,8 +411,8 @@ func isLabelsMatchWithMetric(labelFilter map[string]string, metric *io_prometheu return true } -func getTestOperationGenerator(volumePluginMgr *volume.VolumePluginMgr) OperationGenerator { - fakeKubeClient := fakeclient.NewSimpleClientset() +func getTestOperationGenerator(volumePluginMgr *volume.VolumePluginMgr, objects ...runtime.Object) OperationGenerator { + fakeKubeClient := fakeclient.NewSimpleClientset(objects...) fakeRecorder := &record.FakeRecorder{} fakeHandler := volumetesting.NewBlockVolumePathHandler() operationGenerator := NewOperationGenerator( diff --git a/pkg/volume/util/resize_util.go b/pkg/volume/util/resize_util.go index 195819d1408..4368b5fec7f 100644 --- a/pkg/volume/util/resize_util.go +++ b/pkg/volume/util/resize_util.go @@ -28,7 +28,9 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" + utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" "k8s.io/mount-utils" @@ -61,7 +63,7 @@ func ClaimToClaimKey(claim *v1.PersistentVolumeClaim) string { func UpdatePVSize( pv *v1.PersistentVolume, newSize resource.Quantity, - kubeClient clientset.Interface) error { + kubeClient clientset.Interface) (*v1.PersistentVolume, error) { pvClone := pv.DeepCopy() pvClone.Spec.Capacity[v1.ResourceStorage] = newSize @@ -84,7 +86,8 @@ func AddAnnPreResizeCapacity( } pvClone.ObjectMeta.Annotations[AnnPreResizeCapacity] = oldCapacity.String() - return PatchPV(pv, pvClone, kubeClient) + _, err := PatchPV(pv, pvClone, kubeClient) + return err } // DeleteAnnPreResizeCapacity deletes volume.alpha.kubernetes.io/pre-resize-capacity from the pv @@ -97,35 +100,35 @@ func DeleteAnnPreResizeCapacity( } pvClone := pv.DeepCopy() delete(pvClone.ObjectMeta.Annotations, AnnPreResizeCapacity) - - return PatchPV(pv, pvClone, kubeClient) + _, err := PatchPV(pv, pvClone, kubeClient) + return err } // PatchPV creates and executes a patch for pv func PatchPV( oldPV *v1.PersistentVolume, newPV *v1.PersistentVolume, - kubeClient clientset.Interface) error { + kubeClient clientset.Interface) (*v1.PersistentVolume, error) { oldData, err := json.Marshal(oldPV) if err != nil { - return fmt.Errorf("unexpected error marshaling old PV %q with error : %v", oldPV.Name, err) + return oldPV, fmt.Errorf("unexpected error marshaling old PV %q with error : %v", oldPV.Name, err) } newData, err := json.Marshal(newPV) if err != nil { - return fmt.Errorf("unexpected error marshaling new PV %q with error : %v", newPV.Name, err) + return oldPV, fmt.Errorf("unexpected error marshaling new PV %q with error : %v", newPV.Name, err) } patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPV) if err != nil { - return fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", oldPV.Name, err) + return oldPV, fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", oldPV.Name, err) } - _, err = kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), oldPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + updatedPV, err := kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), oldPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) if err != nil { - return fmt.Errorf("error Patching PV %q with error : %v", oldPV.Name, err) + return oldPV, fmt.Errorf("error Patching PV %q with error : %v", oldPV.Name, err) } - return nil + return updatedPV, nil } // MarkResizeInProgressWithResizer marks cloudprovider resizing as in progress @@ -147,6 +150,23 @@ func MarkResizeInProgressWithResizer( return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) } +func MarkControllerReisizeInProgress(pvc *v1.PersistentVolumeClaim, resizerName string, newSize resource.Quantity, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { + // Mark PVC as Resize Started + progressCondition := v1.PersistentVolumeClaimCondition{ + Type: v1.PersistentVolumeClaimResizing, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Now(), + } + controllerExpansionInProgress := v1.PersistentVolumeClaimControllerExpansionInProgress + conditions := []v1.PersistentVolumeClaimCondition{progressCondition} + newPVC := pvc.DeepCopy() + newPVC = MergeResizeConditionOnPVC(newPVC, conditions) + newPVC.Status.ResizeStatus = &controllerExpansionInProgress + newPVC.Status.AllocatedResources = v1.ResourceList{v1.ResourceStorage: newSize} + newPVC = setResizer(newPVC, resizerName) + return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) +} + // SetClaimResizer sets resizer annotation on PVC func SetClaimResizer( pvc *v1.PersistentVolumeClaim, @@ -168,7 +188,7 @@ func setResizer(pvc *v1.PersistentVolumeClaim, resizerName string) *v1.Persisten // MarkForFSResize marks file system resizing as pending func MarkForFSResize( pvc *v1.PersistentVolumeClaim, - kubeClient clientset.Interface) error { + kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { pvcCondition := v1.PersistentVolumeClaimCondition{ Type: v1.PersistentVolumeClaimFileSystemResizePending, Status: v1.ConditionTrue, @@ -177,16 +197,20 @@ func MarkForFSResize( } conditions := []v1.PersistentVolumeClaimCondition{pvcCondition} newPVC := pvc.DeepCopy() + if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) { + expansionPendingOnNode := v1.PersistentVolumeClaimNodeExpansionPending + newPVC.Status.ResizeStatus = &expansionPendingOnNode + } newPVC = MergeResizeConditionOnPVC(newPVC, conditions) - _, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) - return err + updatedPVC, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) + return updatedPVC, err } // MarkResizeFinished marks all resizing as done func MarkResizeFinished( pvc *v1.PersistentVolumeClaim, newSize resource.Quantity, - kubeClient clientset.Interface) error { + kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { return MarkFSResizeFinished(pvc, newSize, kubeClient) } @@ -194,12 +218,65 @@ func MarkResizeFinished( func MarkFSResizeFinished( pvc *v1.PersistentVolumeClaim, newSize resource.Quantity, - kubeClient clientset.Interface) error { + kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { newPVC := pvc.DeepCopy() + newPVC.Status.Capacity[v1.ResourceStorage] = newSize + + // if RecoverVolumeExpansionFailure is enabled, we need to reset ResizeStatus back to nil + if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) { + expansionFinished := v1.PersistentVolumeClaimNoExpansionInProgress + newPVC.Status.ResizeStatus = &expansionFinished + } + newPVC = MergeResizeConditionOnPVC(newPVC, []v1.PersistentVolumeClaimCondition{}) - _, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) - return err + updatedPVC, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) + return updatedPVC, err +} + +func MarkControllerExpansionFailed(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { + expansionFailedOnController := v1.PersistentVolumeClaimControllerExpansionFailed + newPVC := pvc.DeepCopy() + newPVC.Status.ResizeStatus = &expansionFailedOnController + patchBytes, err := createPVCPatch(pvc, newPVC, false /* addResourceVersionCheck */) + if err != nil { + return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, err) + } + + updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace). + Patch(context.TODO(), pvc.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") + if updateErr != nil { + return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, updateErr) + } + return updatedClaim, nil +} + +// MarkNodeExpansionFailed marks a PVC for node expansion as failed. Kubelet should not retry expansion +// of volumes which are in failed state. +func MarkNodeExpansionFailed(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { + expansionFailedOnNode := v1.PersistentVolumeClaimNodeExpansionFailed + newPVC := pvc.DeepCopy() + newPVC.Status.ResizeStatus = &expansionFailedOnNode + patchBytes, err := createPVCPatch(pvc, newPVC, false /* addResourceVersionCheck */) + if err != nil { + return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, err) + } + + updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace). + Patch(context.TODO(), pvc.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") + if updateErr != nil { + return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, updateErr) + } + return updatedClaim, nil +} + +// MarkNodeExpansionInProgress marks pvc expansion in progress on node +func MarkNodeExpansionInProgress(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { + nodeExpansionInProgress := v1.PersistentVolumeClaimNodeExpansionInProgress + newPVC := pvc.DeepCopy() + newPVC.Status.ResizeStatus = &nodeExpansionInProgress + updatedPVC, err := PatchPVCStatus(pvc /* oldPVC */, newPVC, kubeClient) + return updatedPVC, err } // PatchPVCStatus updates PVC status using PATCH verb @@ -210,22 +287,22 @@ func PatchPVCStatus( oldPVC *v1.PersistentVolumeClaim, newPVC *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { - patchBytes, err := createPVCPatch(oldPVC, newPVC) + patchBytes, err := createPVCPatch(oldPVC, newPVC, true /* addResourceVersionCheck */) if err != nil { - return nil, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, err) + return oldPVC, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, err) } updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace). Patch(context.TODO(), oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") if updateErr != nil { - return nil, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, updateErr) + return oldPVC, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, updateErr) } return updatedClaim, nil } func createPVCPatch( oldPVC *v1.PersistentVolumeClaim, - newPVC *v1.PersistentVolumeClaim) ([]byte, error) { + newPVC *v1.PersistentVolumeClaim, addResourceVersionCheck bool) ([]byte, error) { oldData, err := json.Marshal(oldPVC) if err != nil { return nil, fmt.Errorf("failed to marshal old data: %v", err) @@ -241,9 +318,11 @@ func createPVCPatch( return nil, fmt.Errorf("failed to create 2 way merge patch: %v", err) } - patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion) - if err != nil { - return nil, fmt.Errorf("failed to add resource version: %v", err) + if addResourceVersionCheck { + patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion) + if err != nil { + return nil, fmt.Errorf("failed to add resource version: %v", err) + } } return patchBytes, nil diff --git a/pkg/volume/util/resize_util_test.go b/pkg/volume/util/resize_util_test.go index ee5fd46c9ed..912673d1bc7 100644 --- a/pkg/volume/util/resize_util_test.go +++ b/pkg/volume/util/resize_util_test.go @@ -155,7 +155,7 @@ func TestCreatePVCPatch(t *testing.T) { pvc2.Status.Capacity = v1.ResourceList{ v1.ResourceName("size"): resource.MustParse("10G"), } - patchBytes, err := createPVCPatch(pvc1, pvc2) + patchBytes, err := createPVCPatch(pvc1, pvc2, true /* addResourceVersionCheck */) if err != nil { t.Errorf("error creating patch bytes %v", err) }