diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index 180f1a7b6ff..e848bd87e10 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -582,8 +582,9 @@ func (asw *actualStateOfWorld) GetAttachState( return AttachStateDetached } +// SetVolumeClaimSize sets size of the volume. But this function should not be used from attach_detach controller. func (asw *actualStateOfWorld) SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) { - klog.V(5).Infof("doing nothing") + klog.V(5).Infof("NO-OP") } func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume { diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index a6b31f8c56b..0d0164156f9 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -107,7 +107,7 @@ type ActualStateOfWorld interface { // volumes, depend on this to update the contents of the volume. // All volume mounting calls should be idempotent so a second mount call for // volumes that do not need to update contents should not fail. - PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize *resource.Quantity) (bool, string, error) + PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity) (bool, string, error) // PodRemovedFromVolume returns true if the given pod does not exist in the list of // mountedPods for the given volume in the cache, indicating that the pod has @@ -161,11 +161,6 @@ type ActualStateOfWorld interface { // no longer referenced and may be globally unmounted and detached. GetUnmountedVolumes() []AttachedVolume - // MarkFSResizeRequired marks each volume that is successfully attached and - // mounted for the specified pod as requiring file system resize (if the plugin for the - // volume indicates it requires file system resize). - MarkFSResizeRequired(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) - // GetAttachedVolumes returns a list of volumes that is known to be attached // to the node. This list can be used to determine volumes that are either in-use // or have a mount/unmount operation pending. @@ -329,10 +324,6 @@ type mountedPod struct { // volumeGidValue contains the value of the GID annotation, if present. volumeGidValue string - // fsResizeRequired indicates the underlying volume has been successfully - // mounted to this pod but its size has been expanded after that. - fsResizeRequired bool - // volumeMountStateForPod stores state of volume mount for the pod. if it is: // - VolumeMounted: means volume for pod has been successfully mounted // - VolumeMountUncertain: means volume for pod may not be mounted, but it must be unmounted @@ -552,30 +543,17 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M return nil } -func (asw *actualStateOfWorld) MarkVolumeAsResized( - podName volumetypes.UniquePodName, - volumeName v1.UniqueVolumeName) error { +func (asw *actualStateOfWorld) MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool { asw.Lock() defer asw.Unlock() - volumeObj, volumeExists := asw.attachedVolumes[volumeName] - if !volumeExists { - return fmt.Errorf( - "no volume with the name %q exists in the list of attached volumes", - volumeName) + volumeObj, ok := asw.attachedVolumes[volumeName] + if ok { + volumeObj.persistentVolumeSize = claimSize + asw.attachedVolumes[volumeName] = volumeObj + return true } - - podObj, podExists := volumeObj.mountedPods[podName] - if !podExists { - return fmt.Errorf( - "no pod with the name %q exists in the mounted pods list of volume %s", - podName, - volumeName) - } - klog.V(5).InfoS("Pod volume has been resized", "uniquePodName", podName, "volumeName", volumeName, "outerVolumeSpecName", podObj.outerVolumeSpecName) - podObj.fsResizeRequired = false - asw.attachedVolumes[volumeName].mountedPods[podName] = podObj - return nil + return false } func (asw *actualStateOfWorld) MarkRemountRequired( @@ -600,40 +578,6 @@ func (asw *actualStateOfWorld) MarkRemountRequired( } } -func (asw *actualStateOfWorld) MarkFSResizeRequired( - volumeName v1.UniqueVolumeName, - podName volumetypes.UniquePodName) { - asw.Lock() - defer asw.Unlock() - volumeObj, volumeExists := asw.attachedVolumes[volumeName] - if !volumeExists { - klog.InfoS("MarkFSResizeRequired for volume failed as volume does not exist", "volumeName", volumeName) - return - } - - podObj, podExists := volumeObj.mountedPods[podName] - if !podExists { - klog.InfoS("MarkFSResizeRequired for volume failed because the pod does not exist", "uniquePodName", podName, "volumeName", volumeName) - return - } - - volumePlugin, err := - asw.volumePluginMgr.FindNodeExpandablePluginBySpec(podObj.volumeSpec) - if err != nil || volumePlugin == nil { - // Log and continue processing - klog.ErrorS(nil, "MarkFSResizeRequired failed to find expandable plugin for volume", "uniquePodName", podObj.podName, "volumeName", volumeObj.volumeName, "volumeSpecName", podObj.volumeSpec.Name()) - return - } - - if volumePlugin.RequiresFSResize() { - if !podObj.fsResizeRequired { - klog.V(3).InfoS("PVC volume of the pod requires file system resize", "uniquePodName", podName, "volumeName", volumeName, "outerVolumeSpecName", podObj.outerVolumeSpecName) - podObj.fsResizeRequired = true - } - asw.attachedVolumes[volumeName].mountedPods[podName] = podObj - } -} - func (asw *actualStateOfWorld) SetDeviceMountState( volumeName v1.UniqueVolumeName, deviceMountState operationexecutor.DeviceMountState, devicePath, deviceMountPath string) error { asw.Lock() @@ -708,10 +652,7 @@ func (asw *actualStateOfWorld) DeleteVolume(volumeName v1.UniqueVolumeName) erro return nil } -func (asw *actualStateOfWorld) PodExistsInVolume( - podName volumetypes.UniquePodName, - volumeName v1.UniqueVolumeName, - desiredVolumeSize *resource.Quantity) (bool, string, error) { +func (asw *actualStateOfWorld) PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity) (bool, string, error) { asw.RLock() defer asw.RUnlock() @@ -729,36 +670,40 @@ func (asw *actualStateOfWorld) PodExistsInVolume( if podObj.remountRequired { return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName) } - if asw.volumeNeedsExpansion(volumeObj, desiredVolumeSize) { - return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName) + if currentSize, expandVolume := asw.volumeNeedsExpansion(volumeObj, desiredVolumeSize); expandVolume { + return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName, currentSize) } } return podExists, volumeObj.devicePath, nil } -func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, desiredVolumeSize *resource.Quantity) bool { - if volumeObj.volumeInUseErrorForExpansion { - return false +func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, desiredVolumeSize resource.Quantity) (resource.Quantity, bool) { + currentSize := resource.Quantity{} + if volumeObj.persistentVolumeSize != nil { + currentSize = volumeObj.persistentVolumeSize.DeepCopy() } - if volumeObj.persistentVolumeSize == nil || desiredVolumeSize == nil { - return false + if volumeObj.volumeInUseErrorForExpansion { + return currentSize, false + } + if volumeObj.persistentVolumeSize == nil || desiredVolumeSize.IsZero() { + return currentSize, false } if desiredVolumeSize.Cmp(*volumeObj.persistentVolumeSize) > 0 { volumePlugin, err := asw.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeObj.spec) if err != nil || volumePlugin == nil { // Log and continue processing - klog.Errorf( - "PodExistsInVolume failed to find expandable plugin volume: %q (volSpecName: %q)", - volumeObj.volumeName, volumeObj.spec.Name()) - return false + klog.InfoS("PodExistsInVolume failed to find expandable plugin", + "volume", volumeObj.volumeName, + "volumeSpecName", volumeObj.spec.Name()) + return currentSize, false } if volumePlugin.RequiresFSResize() { - return true + return currentSize, true } } - return false + return currentSize, false } func (asw *actualStateOfWorld) PodRemovedFromVolume( @@ -1005,29 +950,31 @@ func newRemountRequiredError( // fsResizeRequiredError is an error returned when PodExistsInVolume() found // volume/pod attached/mounted but fsResizeRequired was true, indicating the // given volume receives an resize request after attached/mounted. -type fsResizeRequiredError struct { - volumeName v1.UniqueVolumeName - podName volumetypes.UniquePodName +type FsResizeRequiredError struct { + CurrentSize resource.Quantity + volumeName v1.UniqueVolumeName + podName volumetypes.UniquePodName } -func (err fsResizeRequiredError) Error() string { +func (err FsResizeRequiredError) Error() string { return fmt.Sprintf( "volumeName %q mounted to %q needs to resize file system", err.volumeName, err.podName) } func newFsResizeRequiredError( - volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) error { - return fsResizeRequiredError{ - volumeName: volumeName, - podName: podName, + volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, currentSize resource.Quantity) error { + return FsResizeRequiredError{ + CurrentSize: currentSize, + volumeName: volumeName, + podName: podName, } } // IsFSResizeRequiredError returns true if the specified error is a // fsResizeRequiredError. func IsFSResizeRequiredError(err error) bool { - _, ok := err.(fsResizeRequiredError) + _, ok := err.(FsResizeRequiredError) return ok } diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go index 6f68cf5574f..cd9fe6c52c8 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "k8s.io/apimachinery/pkg/api/resource" "testing" "github.com/stretchr/testify/require" @@ -676,7 +677,7 @@ func TestUncertainVolumeMounts(t *testing.T) { t.Fatalf("expected volume %s to be found in aws.GetPossiblyMountedVolumesForPod", volumeSpec1.Name()) } - volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1, nil) + volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1, resource.Quantity{}) if volExists { t.Fatalf("expected volume %s to not exist in asw", generatedVolumeName1) } @@ -762,7 +763,7 @@ func verifyPodExistsInVolumeAsw( expectedDevicePath string, asw ActualStateOfWorld) { podExistsInVolume, devicePath, err := - asw.PodExistsInVolume(expectedPodName, expectedVolumeName, nil) + asw.PodExistsInVolume(expectedPodName, expectedVolumeName, resource.Quantity{}) if err != nil { t.Fatalf( "ASW PodExistsInVolume failed. Expected: Actual: <%v>", err) @@ -804,7 +805,7 @@ func verifyPodDoesntExistInVolumeAsw( expectVolumeToExist bool, asw ActualStateOfWorld) { podExistsInVolume, devicePath, err := - asw.PodExistsInVolume(podToCheck, volumeToCheck, nil) + asw.PodExistsInVolume(podToCheck, volumeToCheck, resource.Quantity{}) if !expectVolumeToExist && err == nil { t.Fatalf( "ASW PodExistsInVolume did not return error. Expected: Actual: <%v>", err) diff --git a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go index 06b8caf5623..60f1a56852d 100644 --- a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go @@ -135,7 +135,6 @@ type DesiredStateOfWorld interface { // be mounted to PodName. type VolumeToMount struct { operationexecutor.VolumeToMount - PersistentVolumeSize *resource.Quantity } // NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld. @@ -435,23 +434,25 @@ func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount { volumesToMount := make([]VolumeToMount, 0 /* len */, len(dsw.volumesToMount) /* cap */) for volumeName, volumeObj := range dsw.volumesToMount { for podName, podObj := range volumeObj.podsToMount { - volumesToMount = append( - volumesToMount, - VolumeToMount{ - VolumeToMount: operationexecutor.VolumeToMount{ - VolumeName: volumeName, - PodName: podName, - Pod: podObj.pod, - VolumeSpec: podObj.volumeSpec, - PluginIsAttachable: volumeObj.pluginIsAttachable, - PluginIsDeviceMountable: volumeObj.pluginIsDeviceMountable, - OuterVolumeSpecName: podObj.outerVolumeSpecName, - VolumeGidValue: volumeObj.volumeGidValue, - ReportedInUse: volumeObj.reportedInUse, - MountRequestTime: podObj.mountRequestTime, - DesiredSizeLimit: volumeObj.desiredSizeLimit}, - PersistentVolumeSize: volumeObj.persistentVolumeSize, - }) + vmt := VolumeToMount{ + VolumeToMount: operationexecutor.VolumeToMount{ + VolumeName: volumeName, + PodName: podName, + Pod: podObj.pod, + VolumeSpec: podObj.volumeSpec, + PluginIsAttachable: volumeObj.pluginIsAttachable, + PluginIsDeviceMountable: volumeObj.pluginIsDeviceMountable, + OuterVolumeSpecName: podObj.outerVolumeSpecName, + VolumeGidValue: volumeObj.volumeGidValue, + ReportedInUse: volumeObj.reportedInUse, + MountRequestTime: podObj.mountRequestTime, + DesiredSizeLimit: volumeObj.desiredSizeLimit, + }, + } + if volumeObj.persistentVolumeSize != nil { + vmt.PersistentVolumeSize = volumeObj.persistentVolumeSize.DeepCopy() + } + volumesToMount = append(volumesToMount, vmt) } } return volumesToMount diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index aa8d3eb0d68..ccbf21a124c 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -977,7 +977,7 @@ func TestCheckVolumeFSResize(t *testing.T) { }, verify: func(t *testing.T, vols []v1.UniqueVolumeName, volName v1.UniqueVolumeName) { if len(vols) == 0 { - t.Fatalf("Request resize for volume, but volume in ASW hasn't been marked as fsResizeRequired") + t.Fatalf("Requested resize for volume, but volume in ASW hasn't been marked as fsResizeRequired") } if len(vols) != 1 { t.Errorf("Some unexpected volumes are marked as fsResizeRequired: %v", vols) @@ -1053,7 +1053,7 @@ func TestCheckVolumeFSResize(t *testing.T) { func() { tc.resize(t, pv, pvc, dswp) - resizeRequiredVolumes := reprocess(dswp, uniquePodName, fakeDSW, fakeASW) + resizeRequiredVolumes := reprocess(dswp, uniquePodName, fakeDSW, fakeASW, *pv.Spec.Capacity.Storage()) tc.verify(t, resizeRequiredVolumes, uniqueVolumeName) }() @@ -1099,16 +1099,16 @@ func clearASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t *te } func reprocess(dswp *desiredStateOfWorldPopulator, uniquePodName types.UniquePodName, - dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName { + dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName { dswp.ReprocessPod(uniquePodName) dswp.findAndAddNewPods() - return getResizeRequiredVolumes(dsw, asw) + return getResizeRequiredVolumes(dsw, asw, newSize) } -func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName { +func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName { resizeRequiredVolumes := []v1.UniqueVolumeName{} for _, volumeToMount := range dsw.GetVolumesToMount() { - _, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, nil) + _, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, newSize) if cache.IsFSResizeRequiredError(err) { resizeRequiredVolumes = append(resizeRequiredVolumes, volumeToMount.VolumeName) } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index a1b258f10ac..c9af55e62c2 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -28,6 +28,8 @@ import ( "path/filepath" "time" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" "k8s.io/mount-utils" utilpath "k8s.io/utils/path" @@ -203,14 +205,15 @@ func (rc *reconciler) mountOrAttachVolumes() { } else if !volMounted || cache.IsRemountRequiredError(err) { rc.mountAttachedVolumes(volumeToMount, err) } else if cache.IsFSResizeRequiredError(err) { - rc.expandVolume(volumeToMount) + fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError) + rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize) } } } -func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount) { +func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount, currentSize resource.Quantity) { klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) - err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld) + err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld, currentSize) if err != nil && !isExpectedError(err) { klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod)) diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 4b5759f243f..5d3d5480bf1 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -1284,10 +1284,11 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { pvWithSize.Spec.Capacity[v1.ResourceStorage] = tc.newPVSize volumeSpec = &volume.Spec{PersistentVolume: pvWithSize} dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) - // mark volume as resize required - asw.MarkFSResizeRequired(volumeName, podName) + t.Logf("Changing size of the volume to %s", tc.newPVSize.String()) + newSize := tc.newPVSize.DeepCopy() + dsw.UpdatePersistentVolumeSize(volumeName, &newSize) - _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, nil) + _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, newSize) if tc.expansionFailed { if cache.IsFSResizeRequiredError(podExistErr) { t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr) @@ -1299,7 +1300,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { go reconciler.Run(wait.NeverStop) waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) { - mounted, _, err := asw.PodExistsInVolume(podName, volumeName, nil) + mounted, _, err := asw.PodExistsInVolume(podName, volumeName, newSize) return mounted && err == nil, nil }) if waitErr != nil { @@ -1791,7 +1792,7 @@ func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, podN err := retryWithExponentialBackOff( testOperationBackOffDuration, func() (bool, error) { - mounted, _, err := asw.PodExistsInVolume(podName, volumeName, nil) + mounted, _, err := asw.PodExistsInVolume(podName, volumeName, resource.Quantity{}) if mounted || err != nil { return false, nil } diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 47234e99de4..85d5767858d 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -495,7 +495,6 @@ func (spec *Spec) IsKubeletExpandable() bool { return spec.PersistentVolume.Spec.FlexVolume != nil default: return false - } } diff --git a/pkg/volume/util/operationexecutor/fakegenerator.go b/pkg/volume/util/operationexecutor/fakegenerator.go index 6d8e5a1b3d1..88d980e711d 100644 --- a/pkg/volume/util/operationexecutor/fakegenerator.go +++ b/pkg/volume/util/operationexecutor/fakegenerator.go @@ -17,6 +17,7 @@ limitations under the License. package operationexecutor import ( + "k8s.io/apimachinery/pkg/api/resource" "time" v1 "k8s.io/api/core/v1" @@ -108,7 +109,7 @@ func (f *fakeOGCounter) GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeC return f.recordFuncCall("GenerateExpandVolumeFunc"), nil } -func (f *fakeOGCounter) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateExpandInUseVolumeFunc"), nil } diff --git a/pkg/volume/util/operationexecutor/node_expander.go b/pkg/volume/util/operationexecutor/node_expander.go new file mode 100644 index 00000000000..a10f0af030d --- /dev/null +++ b/pkg/volume/util/operationexecutor/node_expander.go @@ -0,0 +1,149 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operationexecutor + +import ( + "fmt" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + kevents "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/volume/util" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" +) + +type NodeExpander struct { + nodeResizeOperationOpts + kubeClient clientset.Interface + recorder record.EventRecorder + + // computed via precheck + pvcStatusCap resource.Quantity + pvCap resource.Quantity + resizeStatus *v1.PersistentVolumeClaimResizeStatus + + // pvcAlreadyUpdated if true indicates that although we are calling NodeExpandVolume on the kubelet + // PVC has already been updated - possibly because expansion already succeeded on different node. + // This can happen when a RWX PVC is expanded. + pvcAlreadyUpdated bool +} + +func newNodeExpander(resizeOp nodeResizeOperationOpts, client clientset.Interface, recorder record.EventRecorder) *NodeExpander { + return &NodeExpander{ + kubeClient: client, + nodeResizeOperationOpts: resizeOp, + recorder: recorder, + } +} + +// testResponseData is merely used for doing sanity checks in unit tests +type testResponseData struct { + // indicates that resize operation was called on underlying volume driver + // mainly useful for testing. + resizeCalledOnPlugin bool + + // Indicates whether kubelet should assume resize operation as finished. + // For kubelet - resize operation could be assumed as finished even if + // actual resizing is *not* finished. This can happen, because certain prechecks + // are failing and kubelet should not retry expansion, or it could happen + // because resize operation is genuinely finished. + assumeResizeFinished bool +} + +// runPreCheck performs some sanity checks before expansion can be performed on the PVC. +func (ne *NodeExpander) runPreCheck() bool { + ne.pvcStatusCap = ne.pvc.Status.Capacity[v1.ResourceStorage] + ne.pvCap = ne.pv.Spec.Capacity[v1.ResourceStorage] + + ne.resizeStatus = ne.pvc.Status.ResizeStatus + + // PVC is already expanded but we are still trying to expand the volume because + // last recorded size in ASOW is older. This can happen for RWX volume types. + if ne.pvcStatusCap.Cmp(ne.pluginResizeOpts.NewSize) >= 0 && (ne.resizeStatus == nil || *ne.resizeStatus == v1.PersistentVolumeClaimNoExpansionInProgress) { + ne.pvcAlreadyUpdated = true + } + + // if resizestatus is nil or NodeExpansionInProgress or NodeExpansionPending then we + // should allow volume expansion on the node to proceed. We are making an exception for + // resizeStatus being nil because it will support use cases where + // resizeStatus may not be set (old control-plane expansion controller etc). + if ne.resizeStatus == nil || + ne.pvcAlreadyUpdated || + *ne.resizeStatus == v1.PersistentVolumeClaimNodeExpansionPending || + *ne.resizeStatus == v1.PersistentVolumeClaimNodeExpansionInProgress { + return true + } + + return false +} + +func (ne *NodeExpander) expandOnPlugin() (bool, error, testResponseData) { + allowExpansion := ne.runPreCheck() + if !allowExpansion { + return false, nil, testResponseData{false, true} + } + + var err error + + if !ne.pvcAlreadyUpdated { + ne.pvc, err = util.MarkNodeExpansionInProgress(ne.pvc, ne.kubeClient) + + if err != nil { + msg := ne.vmt.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err) + klog.Errorf(msg.Error()) + return false, err, testResponseData{} + } + } + _, resizeErr := ne.volumePlugin.NodeExpand(ne.pluginResizeOpts) + if resizeErr != nil { + if volumetypes.IsOperationFinishedError(resizeErr) { + var markFailedError error + ne.pvc, markFailedError = util.MarkNodeExpansionFailed(ne.pvc, ne.kubeClient) + if markFailedError != nil { + klog.Errorf(ne.vmt.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error()) + } + } + + // if driver returned FailedPrecondition error that means + // volume expansion should not be retried on this node but + // expansion operation should not block mounting + if volumetypes.IsFailedPreconditionError(resizeErr) { + ne.actualStateOfWorld.MarkForInUseExpansionError(ne.vmt.VolumeName) + klog.Errorf(ne.vmt.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error()) + return false, nil, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true} + } + return false, resizeErr, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true} + } + simpleMsg, detailedMsg := ne.vmt.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "") + ne.recorder.Eventf(ne.vmt.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) + ne.recorder.Eventf(ne.pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) + klog.InfoS(detailedMsg, "pod", klog.KObj(ne.vmt.Pod)) + + // no need to update PVC object if we already updated it + if ne.pvcAlreadyUpdated { + return true, nil, testResponseData{true, true} + } + + // File system resize succeeded, now update the PVC's Capacity to match the PV's + ne.pvc, err = util.MarkFSResizeFinished(ne.pvc, ne.pluginResizeOpts.NewSize, ne.kubeClient) + if err != nil { + return true, fmt.Errorf("mountVolume.NodeExpandVolume update pvc status failed: %v", err), testResponseData{true, true} + } + return true, nil, testResponseData{true, true} +} diff --git a/pkg/volume/util/operationexecutor/node_expander_test.go b/pkg/volume/util/operationexecutor/node_expander_test.go new file mode 100644 index 00000000000..135d94a78cb --- /dev/null +++ b/pkg/volume/util/operationexecutor/node_expander_test.go @@ -0,0 +1,135 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operationexecutor + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/volume" + volumetesting "k8s.io/kubernetes/pkg/volume/testing" + "testing" +) + +func TestNodeExpander(t *testing.T) { + var tests = []struct { + name string + pvc *v1.PersistentVolumeClaim + pv *v1.PersistentVolume + recoverFeatureGate bool + + // expectations of test + expectedResizeStatus v1.PersistentVolumeClaimResizeStatus + expectedStatusSize resource.Quantity + expectResizeCall bool + assumeResizeOpAsFinished bool + expectError bool + }{ + { + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_failed", + pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNodeExpansionFailed), + pv: getTestPV("test-vol0", "2G"), + recoverFeatureGate: true, + + expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed, + expectResizeCall: false, + assumeResizeOpAsFinished: true, + expectedStatusSize: resource.MustParse("1G"), + }, + { + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending", + pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending), + pv: getTestPV("test-vol0", "2G"), + recoverFeatureGate: true, + expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress, + expectResizeCall: true, + assumeResizeOpAsFinished: true, + expectedStatusSize: resource.MustParse("2G"), + }, + { + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing", + pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending), + pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"), + recoverFeatureGate: true, + expectError: true, + expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed, + expectResizeCall: true, + assumeResizeOpAsFinished: true, + expectedStatusSize: resource.MustParse("1G"), + }, + } + + for i := range tests { + test := tests[i] + t.Run(test.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, test.recoverFeatureGate)() + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + + pvc := test.pvc + pv := test.pv + pod := getTestPod("test-pod", pvc.Name) + og := getTestOperationGenerator(volumePluginMgr, pvc, pv) + + vmt := VolumeToMount{ + Pod: pod, + VolumeName: v1.UniqueVolumeName(pv.Name), + VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false), + } + resizeOp := nodeResizeOperationOpts{ + pvc: pvc, + pv: pv, + volumePlugin: fakePlugin, + vmt: vmt, + actualStateOfWorld: nil, + pluginResizeOpts: volume.NodeResizeOptions{ + VolumeSpec: vmt.VolumeSpec, + NewSize: *pv.Spec.Capacity.Storage(), + OldSize: *pvc.Status.Capacity.Storage(), + }, + } + ogInstance, _ := og.(*operationGenerator) + nodeExpander := newNodeExpander(resizeOp, ogInstance.kubeClient, ogInstance.recorder) + + _, err, expansionResponse := nodeExpander.expandOnPlugin() + + pvc = nodeExpander.pvc + pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] + + if !test.expectError && err != nil { + t.Errorf("For test %s, expected no error got: %v", test.name, err) + } + if test.expectError && err == nil { + t.Errorf("For test %s, expected error but got none", test.name) + } + + if test.expectResizeCall != expansionResponse.resizeCalledOnPlugin { + t.Errorf("For test %s, expected resize called %t, got %t", test.name, test.expectResizeCall, expansionResponse.resizeCalledOnPlugin) + } + if test.assumeResizeOpAsFinished != expansionResponse.assumeResizeFinished { + t.Errorf("For test %s, expected assumeResizeOpAsFinished %t, got %t", test.name, test.assumeResizeOpAsFinished, expansionResponse.assumeResizeFinished) + } + if test.expectedResizeStatus != *pvc.Status.ResizeStatus { + t.Errorf("For test %s, expected resizeStatus %v, got %v", test.name, test.expectedResizeStatus, *pvc.Status.ResizeStatus) + } + if pvcStatusCap.Cmp(test.expectedStatusSize) != 0 { + t.Errorf("For test %s, expected status size %s, got %s", test.name, test.expectedStatusSize.String(), pvcStatusCap.String()) + } + }) + } +} diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 1f0d713287b..b50314c913c 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -148,7 +148,7 @@ type OperationExecutor interface { // and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true IsOperationSafeToRetry(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName, operationName string) bool // ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume. - ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error + ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) error // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, volumePath string, pluginName string) (*volume.Spec, error) // CheckVolumeExistenceOperation checks volume existence @@ -201,7 +201,7 @@ type ActualStateOfWorldMounterUpdater interface { MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error // Marks the specified volume's file system resize request is finished. - MarkVolumeAsResized(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error + MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool // GetDeviceMountState returns mount state of the device in global path GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState @@ -423,6 +423,10 @@ type VolumeToMount struct { // time at which volume was requested to be mounted MountRequestTime time.Time + + // PersistentVolumeSize stores desired size of the volume. + // usually this is the size if pv.Spec.Capacity + PersistentVolumeSize resource.Quantity } // DeviceMountState represents device mount state in a global path. @@ -997,8 +1001,8 @@ func (oe *operationExecutor) UnmountDevice( deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations) } -func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld) +func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) error { + generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld, currentSize) if err != nil { return err } diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index 521cdf99ac0..59f985de964 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -18,6 +18,7 @@ package operationexecutor import ( "fmt" + "k8s.io/apimachinery/pkg/api/resource" "strconv" "testing" "time" @@ -668,7 +669,7 @@ func (fopg *fakeOperationGenerator) GenerateExpandAndRecoverVolumeFunc(pvc *v1.P }, nil } -func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { +func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) { opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) return volumetypes.NewOperationContext(nil, nil, false) diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 4c9637089cb..44925d5e9d5 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -87,6 +87,16 @@ type operationGenerator struct { translator InTreeToCSITranslator } +type inTreeResizeResponse struct { + pvc *v1.PersistentVolumeClaim + pv *v1.PersistentVolume + + err error + // indicates that resize operation was called on underlying volume driver + // mainly useful for testing. + resizeCalled bool +} + // NewOperationGenerator is returns instance of operationGenerator func NewOperationGenerator(kubeClient clientset.Interface, volumePluginMgr *volume.VolumePluginMgr, @@ -150,7 +160,7 @@ type OperationGenerator interface { GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume, string) (volumetypes.GeneratedOperations, error) // Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume. - GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) + GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) } type inTreeResizeOpts struct { @@ -161,27 +171,6 @@ type inTreeResizeOpts struct { volumePlugin volume.ExpandableVolumePlugin } -type inTreeResizeResponse struct { - pvc *v1.PersistentVolumeClaim - pv *v1.PersistentVolume - err error - - // Indicates whether kubelet should assume resize operation as finished. - // For kubelet - resize operation could be assumed as finished even if - // actual resizing is *not* finished. This can happen, because certain prechecks - // are failing and kubelet should not retry expansion, or it could happen - // because resize operation is genuinely finished. - assumeResizeOpAsFinished bool - - // indicates that resize operation was called on underlying volume driver - // mainly useful for testing. - resizeCalled bool - - // indicates whether entire volume expansion is finished or not - // only used from nodeExpansion calls. Mainly used for testing. - resizeFinished bool -} - type nodeResizeOperationOpts struct { vmt VolumeToMount pvc *v1.PersistentVolumeClaim @@ -712,7 +701,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( klog.V(verbosity).InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod)) resizeOptions.DeviceMountPath = volumeMounter.GetPath() - _, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) + _, resizeError = og.expandVolumeDuringMount(volumeToMount, actualStateOfWorld, resizeOptions) if resizeError != nil { klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError) eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError) @@ -1205,7 +1194,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( DevicePath: devicePath, DeviceStagePath: stagingPath, } - _, resizeError := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) + _, resizeError := og.expandVolumeDuringMount(volumeToMount, actualStateOfWorld, resizeOptions) if resizeError != nil { klog.Errorf("MapVolume.NodeExpandVolume failed with %v", resizeError) eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError) @@ -1910,7 +1899,7 @@ func (og *operationGenerator) markForPendingNodeExpansion(pvc *v1.PersistentVolu func (og *operationGenerator) GenerateExpandInUseVolumeFunc( volumeToMount VolumeToMount, - actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) { volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) @@ -1923,9 +1912,17 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc( var eventErr, detailedErr error migrated := false + if currentSize.IsZero() || volumeToMount.PersistentVolumeSize.IsZero() { + err := fmt.Errorf("current or new size of the volume is not set") + eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandvolume.expansion failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) + } + resizeOptions := volume.NodeResizeOptions{ VolumeSpec: volumeToMount.VolumeSpec, DevicePath: volumeToMount.DevicePath, + OldSize: currentSize, + NewSize: volumeToMount.PersistentVolumeSize, } fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec) if err != nil { @@ -2027,10 +2024,11 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount, return false, e1, e2 } if resizeDone { - markFSResizedErr := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.PodName, volumeToMount.VolumeName) - if markFSResizedErr != nil { + markingDone := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.VolumeName, &resizeOptions.NewSize) + if !markingDone { // On failure, return error. Caller will log and retry. - e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.MarkVolumeAsResized failed", markFSResizedErr) + genericFailureError := fmt.Errorf("unable to mark volume as resized") + e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.MarkVolumeAsResized failed", genericFailureError) return false, e1, e2 } return true, nil, nil @@ -2038,25 +2036,9 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount, return false, nil, nil } -func (og *operationGenerator) nodeExpandVolume( - volumeToMount VolumeToMount, - actualStateOfWorld ActualStateOfWorldMounterUpdater, - rsOpts volume.NodeResizeOptions) (bool, error) { - - if volumeToMount.VolumeSpec != nil && - volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration { - klog.V(4).Infof("This volume %s is a migrated inline volume and is not resizable", volumeToMount.VolumeName) - return true, nil - } - - // Get expander, if possible - expandableVolumePlugin, _ := - og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec) - - if expandableVolumePlugin != nil && - expandableVolumePlugin.RequiresFSResize() && - volumeToMount.VolumeSpec.PersistentVolume != nil { - +func (og *operationGenerator) expandVolumeDuringMount(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, rsOpts volume.NodeResizeOptions) (bool, error) { + supportsExpansion, expandablePlugin := og.checkIfSupportsNodeExpansion(volumeToMount) + if supportsExpansion { pv := volumeToMount.VolumeSpec.PersistentVolume pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{}) if err != nil { @@ -2071,193 +2053,143 @@ func (og *operationGenerator) nodeExpandVolume( og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) return true, nil } - resizeOp := nodeResizeOperationOpts{ - vmt: volumeToMount, - pvc: pvc, - pv: pv, - pluginResizeOpts: rsOpts, - volumePlugin: expandableVolumePlugin, - actualStateOfWorld: actualStateOfWorld, - } - if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) { - resizeResponse := og.callNodeExpandOnPlugin(resizeOp) - return resizeResponse.assumeResizeOpAsFinished, resizeResponse.err - } else { - return og.legacyCallNodeExpandOnPlugin(resizeOp) + pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] + pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] + if pvcStatusCap.Cmp(pvSpecCap) < 0 { + rsOpts.NewSize = pvSpecCap + rsOpts.OldSize = pvcStatusCap + resizeOp := nodeResizeOperationOpts{ + vmt: volumeToMount, + pvc: pvc, + pv: pv, + pluginResizeOpts: rsOpts, + volumePlugin: expandablePlugin, + actualStateOfWorld: actualStateOfWorld, + } + if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) { + nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder) + resizeFinished, err, _ := nodeExpander.expandOnPlugin() + return resizeFinished, err + } else { + return og.legacyCallNodeExpandOnPlugin(resizeOp) + } } } return true, nil } -// callNodeExpandOnPlugin is newer version of calling node expansion on plugins, which does support -// recovery from volume expansion failure. -func (og *operationGenerator) callNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) inTreeResizeResponse { - pvc := resizeOp.pvc - pv := resizeOp.pv - volumeToMount := resizeOp.vmt - rsOpts := resizeOp.pluginResizeOpts - actualStateOfWorld := resizeOp.actualStateOfWorld - expandableVolumePlugin := resizeOp.volumePlugin - - var err error - pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] - pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] - - resizeResponse := inTreeResizeResponse{ - pvc: pvc, - pv: pv, +func (og *operationGenerator) checkIfSupportsNodeExpansion(volumeToMount VolumeToMount) (bool, volume.NodeExpandableVolumePlugin) { + if volumeToMount.VolumeSpec != nil && + volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration { + klog.V(4).Infof("This volume %s is a migrated inline volume and is not resizable", volumeToMount.VolumeName) + return false, nil } - if permitNodeExpansion(pvc, pv) { - // File system resize was requested, proceed - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod)) - - rsOpts.VolumeSpec = volumeToMount.VolumeSpec - rsOpts.NewSize = pvSpecCap - rsOpts.OldSize = pvcStatusCap - pvc, err = util.MarkNodeExpansionInProgress(pvc, og.kubeClient) - - if err != nil { - msg := volumeToMount.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err) - klog.Errorf(msg.Error()) - resizeResponse.err = msg - return resizeResponse - } - - resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts) - resizeResponse.resizeCalled = true - - if resizeErr != nil { - if volumetypes.IsOperationFinishedError(resizeErr) { - var markFailedError error - pvc, markFailedError = util.MarkNodeExpansionFailed(pvc, og.kubeClient) - // update the pvc with node expansion object - resizeResponse.pvc = pvc - resizeResponse.assumeResizeOpAsFinished = true - if markFailedError != nil { - klog.Errorf(volumeToMount.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error()) - } - } - - // if driver returned FailedPrecondition error that means - // volume expansion should not be retried on this node but - // expansion operation should not block mounting - if volumetypes.IsFailedPreconditionError(resizeErr) { - actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName) - klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error()) - resizeResponse.assumeResizeOpAsFinished = true - return resizeResponse - } - - resizeResponse.err = resizeErr - return resizeResponse - } - resizeResponse.resizeFinished = resizeDone - - // Volume resizing is not done but it did not error out. This could happen if a CSI volume - // does not have node stage_unstage capability but was asked to resize the volume before - // node publish. In which case - we must retry resizing after node publish. - if !resizeDone { - return resizeResponse - } - - simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "") - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) - og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) - klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod)) - - // File system resize succeeded, now update the PVC's Capacity to match the PV's - pvc, err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient) - resizeResponse.pvc = pvc - - if err != nil { - resizeResponse.err = fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err) - // On retry, NodeExpandVolume will be called again but do nothing - return resizeResponse - } - resizeResponse.assumeResizeOpAsFinished = true - return resizeResponse + // Get expander, if possible + expandableVolumePlugin, _ := + og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec) + if expandableVolumePlugin != nil && + expandableVolumePlugin.RequiresFSResize() && + volumeToMount.VolumeSpec.PersistentVolume != nil { + return true, expandableVolumePlugin } - // somehow a resize operation was queued, but we can not perform any resizing because - // prechecks required for node expansion failed. Kubelet should not retry expanding the volume. - resizeResponse.assumeResizeOpAsFinished = true - return resizeResponse + return false, nil +} + +func (og *operationGenerator) nodeExpandVolume( + volumeToMount VolumeToMount, + actualStateOfWorld ActualStateOfWorldMounterUpdater, + rsOpts volume.NodeResizeOptions) (bool, error) { + + supportsExpansion, expandableVolumePlugin := og.checkIfSupportsNodeExpansion(volumeToMount) + + if supportsExpansion { + // lets use sizes handed over to us by caller for comparison + if rsOpts.NewSize.Cmp(rsOpts.OldSize) > 0 { + pv := volumeToMount.VolumeSpec.PersistentVolume + pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{}) + if err != nil { + // Return error rather than leave the file system un-resized, caller will log and retry + return false, fmt.Errorf("mountVolume.NodeExpandVolume get PVC failed : %v", err) + } + + if volumeToMount.VolumeSpec.ReadOnly { + simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system") + klog.Warningf(detailedMsg) + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) + og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) + return true, nil + } + resizeOp := nodeResizeOperationOpts{ + vmt: volumeToMount, + pvc: pvc, + pv: pv, + pluginResizeOpts: rsOpts, + volumePlugin: expandableVolumePlugin, + actualStateOfWorld: actualStateOfWorld, + } + if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) { + nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder) + resizeFinished, err, _ := nodeExpander.expandOnPlugin() + return resizeFinished, err + } else { + return og.legacyCallNodeExpandOnPlugin(resizeOp) + } + } + } + return true, nil } // legacyCallNodeExpandOnPlugin is old version of calling node expansion on plugin, which does not support // recovery from volume expansion failure func (og *operationGenerator) legacyCallNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) (bool, error) { pvc := resizeOp.pvc - pv := resizeOp.pv volumeToMount := resizeOp.vmt rsOpts := resizeOp.pluginResizeOpts actualStateOfWorld := resizeOp.actualStateOfWorld expandableVolumePlugin := resizeOp.volumePlugin + pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] + var err error - pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] - pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] - if pvcStatusCap.Cmp(pvSpecCap) < 0 { - // File system resize was requested, proceed - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod)) + // File system resize was requested, proceed + klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod)) - rsOpts.VolumeSpec = volumeToMount.VolumeSpec - rsOpts.NewSize = pvSpecCap - rsOpts.OldSize = pvcStatusCap - resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts) - if resizeErr != nil { - // if driver returned FailedPrecondition error that means - // volume expansion should not be retried on this node but - // expansion operation should not block mounting - if volumetypes.IsFailedPreconditionError(resizeErr) { - actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName) - klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error()) - return true, nil - } - return false, resizeErr - } - // Volume resizing is not done but it did not error out. This could happen if a CSI volume - // does not have node stage_unstage capability but was asked to resize the volume before - // node publish. In which case - we must retry resizing after node publish. - if !resizeDone { - return false, nil - } - simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "") - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) - og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) - klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod)) - // File system resize succeeded, now update the PVC's Capacity to match the PV's - _, err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient) - if err != nil { - // On retry, NodeExpandVolume will be called again but do nothing - return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err) + rsOpts.VolumeSpec = volumeToMount.VolumeSpec + + _, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts) + if resizeErr != nil { + // if driver returned FailedPrecondition error that means + // volume expansion should not be retried on this node but + // expansion operation should not block mounting + if volumetypes.IsFailedPreconditionError(resizeErr) { + actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName) + klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed", resizeErr).Error()) + return true, nil } + return false, resizeErr + } + + simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "") + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) + og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) + klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod)) + + // if PVC already has new size, there is no need to update it. + if pvcStatusCap.Cmp(rsOpts.NewSize) >= 0 { return true, nil } + + // File system resize succeeded, now update the PVC's Capacity to match the PV's + _, err = util.MarkFSResizeFinished(pvc, rsOpts.NewSize, og.kubeClient) + if err != nil { + // On retry, NodeExpandVolume will be called again but do nothing + return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err) + } return true, nil } -func permitNodeExpansion(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool { - pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] - pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] - // if pvc.Status.Cap is >= pv.Spec.Cap then volume is already expanded - if pvcStatusCap.Cmp(pvSpecCap) >= 0 { - return false - } - - resizeStatus := pvc.Status.ResizeStatus - // if resizestatus is nil or NodeExpansionInProgress or NodeExpansionPending then we should allow volume expansion on - // the node to proceed. We are making an exception for resizeStatus being nil because it will support use cases where - // resizeStatus may not be set (old control-plane expansion controller etc). - if resizeStatus == nil || *resizeStatus == v1.PersistentVolumeClaimNodeExpansionPending || *resizeStatus == v1.PersistentVolumeClaimNodeExpansionInProgress { - return true - } else { - klog.Infof("volume %s/%s can not be expanded because resizeStaus is: %s", pvc.Namespace, pvc.Name, *resizeStatus) - return false - } -} - func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error { mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec) diff --git a/pkg/volume/util/operationexecutor/operation_generator_test.go b/pkg/volume/util/operationexecutor/operation_generator_test.go index c86613d54c5..78bc5bc6deb 100644 --- a/pkg/volume/util/operationexecutor/operation_generator_test.go +++ b/pkg/volume/util/operationexecutor/operation_generator_test.go @@ -209,106 +209,6 @@ func TestOperationGenerator_GenerateExpandAndRecoverVolumeFunc(t *testing.T) { } } -func TestOperationGenerator_callNodeExpansionOnPlugin(t *testing.T) { - var tests = []struct { - name string - pvc *v1.PersistentVolumeClaim - pv *v1.PersistentVolume - recoverFeatureGate bool - - // expectations of test - expectedResizeStatus v1.PersistentVolumeClaimResizeStatus - expectedStatusSize resource.Quantity - expectResizeCall bool - assumeResizeOpAsFinished bool - expectError bool - }{ - { - name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_failed", - pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNodeExpansionFailed), - pv: getTestPV("test-vol0", "2G"), - recoverFeatureGate: true, - - expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed, - expectResizeCall: false, - assumeResizeOpAsFinished: true, - expectedStatusSize: resource.MustParse("1G"), - }, - { - name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending", - pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending), - pv: getTestPV("test-vol0", "2G"), - recoverFeatureGate: true, - expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress, - expectResizeCall: true, - assumeResizeOpAsFinished: true, - expectedStatusSize: resource.MustParse("2G"), - }, - { - name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing", - pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending), - pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"), - recoverFeatureGate: true, - expectError: true, - expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed, - expectResizeCall: true, - assumeResizeOpAsFinished: true, - expectedStatusSize: resource.MustParse("1G"), - }, - } - - for i := range tests { - test := tests[i] - t.Run(test.name, func(t *testing.T) { - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, test.recoverFeatureGate)() - volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) - - pvc := test.pvc - pv := test.pv - pod := getTestPod("test-pod", pvc.Name) - og := getTestOperationGenerator(volumePluginMgr, pvc, pv) - - vmt := VolumeToMount{ - Pod: pod, - VolumeName: v1.UniqueVolumeName(pv.Name), - VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false), - } - resizeOp := nodeResizeOperationOpts{ - pvc: pvc, - pv: pv, - volumePlugin: fakePlugin, - vmt: vmt, - actualStateOfWorld: nil, - } - ogInstance, _ := og.(*operationGenerator) - expansionResponse := ogInstance.callNodeExpandOnPlugin(resizeOp) - - pvc = expansionResponse.pvc - pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] - - if !test.expectError && expansionResponse.err != nil { - t.Errorf("For test %s, expected no error got: %v", test.name, expansionResponse.err) - } - if test.expectError && expansionResponse.err == nil { - t.Errorf("For test %s, expected error but got none", test.name) - } - - if test.expectResizeCall != expansionResponse.resizeCalled { - t.Errorf("For test %s, expected resize called %t, got %t", test.name, test.expectResizeCall, expansionResponse.resizeCalled) - } - if test.assumeResizeOpAsFinished != expansionResponse.assumeResizeOpAsFinished { - t.Errorf("For test %s, expected assumeResizeOpAsFinished %t, got %t", test.name, test.assumeResizeOpAsFinished, expansionResponse.assumeResizeOpAsFinished) - } - if test.expectedResizeStatus != *pvc.Status.ResizeStatus { - t.Errorf("For test %s, expected resizeStatus %v, got %v", test.name, test.expectedResizeStatus, *pvc.Status.ResizeStatus) - } - if pvcStatusCap.Cmp(test.expectedStatusSize) != 0 { - t.Errorf("For test %s, expected status size %s, got %s", test.name, test.expectedStatusSize.String(), pvcStatusCap.String()) - } - }) - } -} - func getTestPod(podName, pvcName string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{