diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index 652d5421d9a..5d580a5f557 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -26,6 +26,8 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" @@ -581,6 +583,16 @@ func (asw *actualStateOfWorld) GetAttachState( return AttachStateDetached } +// SetVolumeClaimSize sets size of the volume. But this function should not be used from attach_detach controller. +func (asw *actualStateOfWorld) InitializeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) { + klog.V(5).Infof("no-op InitializeClaimSize call in attach-detach controller.") +} + +func (asw *actualStateOfWorld) GetClaimSize(volumeName v1.UniqueVolumeName) *resource.Quantity { + // not needed in attach-detach controller + return nil +} + func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume { asw.RLock() defer asw.RUnlock() diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 4ccd922569d..7deb6df910d 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -25,6 +25,7 @@ import ( "sync" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/volume" @@ -106,7 +107,7 @@ type ActualStateOfWorld interface { // volumes, depend on this to update the contents of the volume. // All volume mounting calls should be idempotent so a second mount call for // volumes that do not need to update contents should not fail. - PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) (bool, string, error) + PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity) (bool, string, error) // PodRemovedFromVolume returns true if the given pod does not exist in the list of // mountedPods for the given volume in the cache, indicating that the pod has @@ -160,11 +161,6 @@ type ActualStateOfWorld interface { // no longer referenced and may be globally unmounted and detached. GetUnmountedVolumes() []AttachedVolume - // MarkFSResizeRequired marks each volume that is successfully attached and - // mounted for the specified pod as requiring file system resize (if the plugin for the - // volume indicates it requires file system resize). - MarkFSResizeRequired(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) - // GetAttachedVolumes returns a list of volumes that is known to be attached // to the node. This list can be used to determine volumes that are either in-use // or have a mount/unmount operation pending. @@ -284,6 +280,10 @@ type attachedVolume struct { // volumeInUseErrorForExpansion indicates volume driver has previously returned volume-in-use error // for this volume and volume expansion on this node should not be retried volumeInUseErrorForExpansion bool + + // persistentVolumeSize records size of the volume when pod was started or + // size after successful completion of volume expansion operation. + persistentVolumeSize *resource.Quantity } // The mountedPod object represents a pod for which the kubelet volume manager @@ -325,10 +325,6 @@ type mountedPod struct { // volumeGidValue contains the value of the GID annotation, if present. volumeGidValue string - // fsResizeRequired indicates the underlying volume has been successfully - // mounted to this pod but its size has been expanded after that. - fsResizeRequired bool - // volumeMountStateForPod stores state of volume mount for the pod. if it is: // - VolumeMounted: means volume for pod has been successfully mounted // - VolumeMountUncertain: means volume for pod may not be mounted, but it must be unmounted @@ -548,30 +544,17 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M return nil } -func (asw *actualStateOfWorld) MarkVolumeAsResized( - podName volumetypes.UniquePodName, - volumeName v1.UniqueVolumeName) error { +func (asw *actualStateOfWorld) MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool { asw.Lock() defer asw.Unlock() - volumeObj, volumeExists := asw.attachedVolumes[volumeName] - if !volumeExists { - return fmt.Errorf( - "no volume with the name %q exists in the list of attached volumes", - volumeName) + volumeObj, ok := asw.attachedVolumes[volumeName] + if ok { + volumeObj.persistentVolumeSize = claimSize + asw.attachedVolumes[volumeName] = volumeObj + return true } - - podObj, podExists := volumeObj.mountedPods[podName] - if !podExists { - return fmt.Errorf( - "no pod with the name %q exists in the mounted pods list of volume %s", - podName, - volumeName) - } - klog.V(5).InfoS("Pod volume has been resized", "uniquePodName", podName, "volumeName", volumeName, "outerVolumeSpecName", podObj.outerVolumeSpecName) - podObj.fsResizeRequired = false - asw.attachedVolumes[volumeName].mountedPods[podName] = podObj - return nil + return false } func (asw *actualStateOfWorld) MarkRemountRequired( @@ -596,40 +579,6 @@ func (asw *actualStateOfWorld) MarkRemountRequired( } } -func (asw *actualStateOfWorld) MarkFSResizeRequired( - volumeName v1.UniqueVolumeName, - podName volumetypes.UniquePodName) { - asw.Lock() - defer asw.Unlock() - volumeObj, volumeExists := asw.attachedVolumes[volumeName] - if !volumeExists { - klog.InfoS("MarkFSResizeRequired for volume failed as volume does not exist", "volumeName", volumeName) - return - } - - podObj, podExists := volumeObj.mountedPods[podName] - if !podExists { - klog.InfoS("MarkFSResizeRequired for volume failed because the pod does not exist", "uniquePodName", podName, "volumeName", volumeName) - return - } - - volumePlugin, err := - asw.volumePluginMgr.FindNodeExpandablePluginBySpec(podObj.volumeSpec) - if err != nil || volumePlugin == nil { - // Log and continue processing - klog.ErrorS(nil, "MarkFSResizeRequired failed to find expandable plugin for volume", "uniquePodName", podObj.podName, "volumeName", volumeObj.volumeName, "volumeSpecName", podObj.volumeSpec.Name()) - return - } - - if volumePlugin.RequiresFSResize() { - if !podObj.fsResizeRequired { - klog.V(3).InfoS("PVC volume of the pod requires file system resize", "uniquePodName", podName, "volumeName", volumeName, "outerVolumeSpecName", podObj.outerVolumeSpecName) - podObj.fsResizeRequired = true - } - asw.attachedVolumes[volumeName].mountedPods[podName] = podObj - } -} - func (asw *actualStateOfWorld) SetDeviceMountState( volumeName v1.UniqueVolumeName, deviceMountState operationexecutor.DeviceMountState, devicePath, deviceMountPath string) error { asw.Lock() @@ -651,6 +600,30 @@ func (asw *actualStateOfWorld) SetDeviceMountState( return nil } +func (asw *actualStateOfWorld) InitializeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) { + asw.Lock() + defer asw.Unlock() + + volumeObj, ok := asw.attachedVolumes[volumeName] + // only set volume claim size if claimStatusSize is zero + // this can happen when volume was rebuilt after kubelet startup + if ok && volumeObj.persistentVolumeSize == nil { + volumeObj.persistentVolumeSize = claimSize + asw.attachedVolumes[volumeName] = volumeObj + } +} + +func (asw *actualStateOfWorld) GetClaimSize(volumeName v1.UniqueVolumeName) *resource.Quantity { + asw.RLock() + defer asw.RUnlock() + + volumeObj, ok := asw.attachedVolumes[volumeName] + if ok { + return volumeObj.persistentVolumeSize + } + return nil +} + func (asw *actualStateOfWorld) DeletePodFromVolume( podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error { asw.Lock() @@ -691,9 +664,7 @@ func (asw *actualStateOfWorld) DeleteVolume(volumeName v1.UniqueVolumeName) erro return nil } -func (asw *actualStateOfWorld) PodExistsInVolume( - podName volumetypes.UniquePodName, - volumeName v1.UniqueVolumeName) (bool, string, error) { +func (asw *actualStateOfWorld) PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity) (bool, string, error) { asw.RLock() defer asw.RUnlock() @@ -711,15 +682,42 @@ func (asw *actualStateOfWorld) PodExistsInVolume( if podObj.remountRequired { return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName) } - if podObj.fsResizeRequired && - !volumeObj.volumeInUseErrorForExpansion { - return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName) + if currentSize, expandVolume := asw.volumeNeedsExpansion(volumeObj, desiredVolumeSize); expandVolume { + return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName, currentSize) } } return podExists, volumeObj.devicePath, nil } +func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, desiredVolumeSize resource.Quantity) (resource.Quantity, bool) { + currentSize := resource.Quantity{} + if volumeObj.persistentVolumeSize != nil { + currentSize = volumeObj.persistentVolumeSize.DeepCopy() + } + if volumeObj.volumeInUseErrorForExpansion { + return currentSize, false + } + if volumeObj.persistentVolumeSize == nil || desiredVolumeSize.IsZero() { + return currentSize, false + } + + if desiredVolumeSize.Cmp(*volumeObj.persistentVolumeSize) > 0 { + volumePlugin, err := asw.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeObj.spec) + if err != nil || volumePlugin == nil { + // Log and continue processing + klog.InfoS("PodExistsInVolume failed to find expandable plugin", + "volume", volumeObj.volumeName, + "volumeSpecName", volumeObj.spec.Name()) + return currentSize, false + } + if volumePlugin.RequiresFSResize() { + return currentSize, true + } + } + return currentSize, false +} + func (asw *actualStateOfWorld) PodRemovedFromVolume( podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) bool { @@ -964,29 +962,31 @@ func newRemountRequiredError( // fsResizeRequiredError is an error returned when PodExistsInVolume() found // volume/pod attached/mounted but fsResizeRequired was true, indicating the // given volume receives an resize request after attached/mounted. -type fsResizeRequiredError struct { - volumeName v1.UniqueVolumeName - podName volumetypes.UniquePodName +type FsResizeRequiredError struct { + CurrentSize resource.Quantity + volumeName v1.UniqueVolumeName + podName volumetypes.UniquePodName } -func (err fsResizeRequiredError) Error() string { +func (err FsResizeRequiredError) Error() string { return fmt.Sprintf( "volumeName %q mounted to %q needs to resize file system", err.volumeName, err.podName) } func newFsResizeRequiredError( - volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) error { - return fsResizeRequiredError{ - volumeName: volumeName, - podName: podName, + volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, currentSize resource.Quantity) error { + return FsResizeRequiredError{ + CurrentSize: currentSize, + volumeName: volumeName, + podName: podName, } } // IsFSResizeRequiredError returns true if the specified error is a // fsResizeRequiredError. func IsFSResizeRequiredError(err error) bool { - _, ok := err.(fsResizeRequiredError) + _, ok := err.(FsResizeRequiredError) return ok } diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go index 1456bdd888b..cd9fe6c52c8 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "k8s.io/apimachinery/pkg/api/resource" "testing" "github.com/stretchr/testify/require" @@ -676,7 +677,7 @@ func TestUncertainVolumeMounts(t *testing.T) { t.Fatalf("expected volume %s to be found in aws.GetPossiblyMountedVolumesForPod", volumeSpec1.Name()) } - volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1) + volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1, resource.Quantity{}) if volExists { t.Fatalf("expected volume %s to not exist in asw", generatedVolumeName1) } @@ -762,7 +763,7 @@ func verifyPodExistsInVolumeAsw( expectedDevicePath string, asw ActualStateOfWorld) { podExistsInVolume, devicePath, err := - asw.PodExistsInVolume(expectedPodName, expectedVolumeName) + asw.PodExistsInVolume(expectedPodName, expectedVolumeName, resource.Quantity{}) if err != nil { t.Fatalf( "ASW PodExistsInVolume failed. Expected: Actual: <%v>", err) @@ -804,7 +805,7 @@ func verifyPodDoesntExistInVolumeAsw( expectVolumeToExist bool, asw ActualStateOfWorld) { podExistsInVolume, devicePath, err := - asw.PodExistsInVolume(podToCheck, volumeToCheck) + asw.PodExistsInVolume(podToCheck, volumeToCheck, resource.Quantity{}) if !expectVolumeToExist && err == nil { t.Fatalf( "ASW PodExistsInVolume did not return error. Expected: Actual: <%v>", err) diff --git a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go index c96b804a377..0a999833a5e 100644 --- a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go @@ -124,6 +124,11 @@ type DesiredStateOfWorld interface { // MarkVolumeAttachability updates the volume's attachability for a given volume MarkVolumeAttachability(volumeName v1.UniqueVolumeName, attachable bool) + + // UpdatePersistentVolumeSize updates persistentVolumeSize in desired state of the world + // so as it can be compared against actual size and volume expansion performed + // if necessary + UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size *resource.Quantity) } // VolumeToMount represents a volume that is attached to this node and needs to @@ -186,6 +191,10 @@ type volumeToMount struct { // desiredSizeLimit indicates the desired upper bound on the size of the volume // (if so implemented) desiredSizeLimit *resource.Quantity + + // persistentVolumeSize records desired size of a persistent volume. + // Usually this value reflects size recorded in pv.Spec.Capacity + persistentVolumeSize *resource.Quantity } // The pod object represents a pod that references the underlying volume and @@ -274,7 +283,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume( } } } - dsw.volumesToMount[volumeName] = volumeToMount{ + vmt := volumeToMount{ volumeName: volumeName, podsToMount: make(map[types.UniquePodName]podToMount), pluginIsAttachable: attachable, @@ -283,6 +292,16 @@ func (dsw *desiredStateOfWorld) AddPodToVolume( reportedInUse: false, desiredSizeLimit: sizeLimit, } + // record desired size of the volume + if volumeSpec.PersistentVolume != nil { + pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage() + if pvCap != nil { + pvCapCopy := pvCap.DeepCopy() + vmt.persistentVolumeSize = &pvCapCopy + } + } + + dsw.volumesToMount[volumeName] = vmt } oldPodMount, ok := dsw.volumesToMount[volumeName].podsToMount[podName] mountRequestTime := time.Now() @@ -347,6 +366,19 @@ func (dsw *desiredStateOfWorld) DeletePodFromVolume( } } +// UpdatePersistentVolumeSize updates last known PV size. This is used for volume expansion and +// should be only used for persistent volumes. +func (dsw *desiredStateOfWorld) UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size *resource.Quantity) { + dsw.Lock() + defer dsw.Unlock() + + vol, volExists := dsw.volumesToMount[volumeName] + if volExists { + vol.persistentVolumeSize = size + dsw.volumesToMount[volumeName] = vol + } +} + func (dsw *desiredStateOfWorld) VolumeExists( volumeName v1.UniqueVolumeName) bool { dsw.RLock() @@ -403,21 +435,25 @@ func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount { volumesToMount := make([]VolumeToMount, 0 /* len */, len(dsw.volumesToMount) /* cap */) for volumeName, volumeObj := range dsw.volumesToMount { for podName, podObj := range volumeObj.podsToMount { - volumesToMount = append( - volumesToMount, - VolumeToMount{ - VolumeToMount: operationexecutor.VolumeToMount{ - VolumeName: volumeName, - PodName: podName, - Pod: podObj.pod, - VolumeSpec: podObj.volumeSpec, - PluginIsAttachable: volumeObj.pluginIsAttachable, - PluginIsDeviceMountable: volumeObj.pluginIsDeviceMountable, - OuterVolumeSpecName: podObj.outerVolumeSpecName, - VolumeGidValue: volumeObj.volumeGidValue, - ReportedInUse: volumeObj.reportedInUse, - MountRequestTime: podObj.mountRequestTime, - DesiredSizeLimit: volumeObj.desiredSizeLimit}}) + vmt := VolumeToMount{ + VolumeToMount: operationexecutor.VolumeToMount{ + VolumeName: volumeName, + PodName: podName, + Pod: podObj.pod, + VolumeSpec: podObj.volumeSpec, + PluginIsAttachable: volumeObj.pluginIsAttachable, + PluginIsDeviceMountable: volumeObj.pluginIsDeviceMountable, + OuterVolumeSpecName: podObj.outerVolumeSpecName, + VolumeGidValue: volumeObj.volumeGidValue, + ReportedInUse: volumeObj.reportedInUse, + MountRequestTime: podObj.mountRequestTime, + DesiredSizeLimit: volumeObj.desiredSizeLimit, + }, + } + if volumeObj.persistentVolumeSize != nil { + vmt.PersistentVolumeSize = volumeObj.persistentVolumeSize.DeepCopy() + } + volumesToMount = append(volumesToMount, vmt) } } return volumesToMount diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index e83325a60e9..20316818018 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -195,13 +195,12 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() { mountedVolumes[mountedVolume.OuterVolumeSpecName] = mountedVolume } - processedVolumesForFSResize := sets.NewString() for _, pod := range dswp.podManager.GetPods() { if dswp.podStateProvider.ShouldPodContainersBeTerminating(pod.UID) { // Do not (re)add volumes for pods that can't also be starting containers continue } - dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize) + dswp.processPodVolumes(pod, mountedVolumesForPod) } } @@ -270,8 +269,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { // desired state of the world. func (dswp *desiredStateOfWorldPopulator) processPodVolumes( pod *v1.Pod, - mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume, - processedVolumesForFSResize sets.String) { + mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume) { if pod == nil { return } @@ -314,9 +312,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes( // sync reconstructed volume dswp.actualStateOfWorld.SyncReconstructedVolume(uniqueVolumeName, uniquePodName, podVolume.Name) - dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, - uniquePodName, mountedVolumesForPod, processedVolumesForFSResize) - + dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, uniquePodName, mountedVolumesForPod) } // some of the volume additions may have failed, should not mark this pod as fully processed @@ -337,21 +333,16 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes( } -// checkVolumeFSResize checks whether a PVC mounted by the pod requires file -// system resize or not. If so, marks this volume as fsResizeRequired in ASW. -// - mountedVolumesForPod stores all mounted volumes in ASW, because online -// volume resize only considers mounted volumes. -// - processedVolumesForFSResize stores all volumes we have checked in current loop, -// because file system resize operation is a global operation for volume, so -// we only need to check it once if more than one pod use it. +// checkVolumeFSResize records desired PVC size for a volume mounted by the pod. +// It is used for comparison with actual size(coming from pvc.Status.Capacity) and calling +// volume expansion on the node if needed. func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize( pod *v1.Pod, podVolume v1.Volume, pvc *v1.PersistentVolumeClaim, volumeSpec *volume.Spec, uniquePodName volumetypes.UniquePodName, - mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume, - processedVolumesForFSResize sets.String) { + mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume) { if podVolume.PersistentVolumeClaim == nil { // Only PVC supports resize operation. return @@ -363,11 +354,6 @@ func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize( // or online resize in subsequent loop(after we confirm it has been mounted). return } - if processedVolumesForFSResize.Has(string(uniqueVolumeName)) { - // File system resize operation is a global operation for volume, - // so we only need to check it once if more than one pod use it. - return - } // volumeSpec.ReadOnly is the value that determines if volume could be formatted when being mounted. // This is the same flag that determines filesystem resizing behaviour for offline resizing and hence // we should use it here. This value comes from Pod.spec.volumes.persistentVolumeClaim.readOnly. @@ -376,10 +362,12 @@ func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize( klog.V(5).InfoS("Skip file system resize check for the volume, as the volume is mounted as readonly", "pod", klog.KObj(pod), "volumeName", podVolume.Name) return } - if volumeRequiresFSResize(pvc, volumeSpec.PersistentVolume) { - dswp.actualStateOfWorld.MarkFSResizeRequired(uniqueVolumeName, uniquePodName) - } - processedVolumesForFSResize.Insert(string(uniqueVolumeName)) + pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage() + pvcStatusCap := pvc.Status.Capacity.Storage() + dswp.desiredStateOfWorld.UpdatePersistentVolumeSize(uniqueVolumeName, pvCap) + + // in case the actualStateOfWorld was rebuild after kubelet restart ensure that claimSize is set to accurate value + dswp.actualStateOfWorld.InitializeClaimSize(uniqueVolumeName, pvcStatusCap) } func getUniqueVolumeName( @@ -397,12 +385,6 @@ func getUniqueVolumeName( return mountedVolume.VolumeName, true } -func volumeRequiresFSResize(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool { - capacity := pvc.Status.Capacity[v1.ResourceStorage] - requested := pv.Spec.Capacity[v1.ResourceStorage] - return requested.Cmp(capacity) > 0 -} - // podPreviouslyProcessed returns true if the volumes for this pod have already // been processed/reprocessed by the populator. Otherwise, the volumes for this pod need to // be reprocessed. diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index e32dcd6a9f2..ccbf21a124c 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -977,7 +977,7 @@ func TestCheckVolumeFSResize(t *testing.T) { }, verify: func(t *testing.T, vols []v1.UniqueVolumeName, volName v1.UniqueVolumeName) { if len(vols) == 0 { - t.Fatalf("Request resize for volume, but volume in ASW hasn't been marked as fsResizeRequired") + t.Fatalf("Requested resize for volume, but volume in ASW hasn't been marked as fsResizeRequired") } if len(vols) != 1 { t.Errorf("Some unexpected volumes are marked as fsResizeRequired: %v", vols) @@ -1053,7 +1053,7 @@ func TestCheckVolumeFSResize(t *testing.T) { func() { tc.resize(t, pv, pvc, dswp) - resizeRequiredVolumes := reprocess(dswp, uniquePodName, fakeDSW, fakeASW) + resizeRequiredVolumes := reprocess(dswp, uniquePodName, fakeDSW, fakeASW, *pv.Spec.Capacity.Storage()) tc.verify(t, resizeRequiredVolumes, uniqueVolumeName) }() @@ -1099,16 +1099,16 @@ func clearASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t *te } func reprocess(dswp *desiredStateOfWorldPopulator, uniquePodName types.UniquePodName, - dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName { + dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName { dswp.ReprocessPod(uniquePodName) dswp.findAndAddNewPods() - return getResizeRequiredVolumes(dsw, asw) + return getResizeRequiredVolumes(dsw, asw, newSize) } -func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName { +func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName { resizeRequiredVolumes := []v1.UniqueVolumeName{} for _, volumeToMount := range dsw.GetVolumesToMount() { - _, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) + _, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, newSize) if cache.IsFSResizeRequiredError(err) { resizeRequiredVolumes = append(resizeRequiredVolumes, volumeToMount.VolumeName) } @@ -1127,7 +1127,6 @@ func verifyVolumeExistsInVolumesToMount(t *testing.T, expectedVolumeName v1.Uniq expectReportedInUse, volume.ReportedInUse) } - return } } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 1188d2ef72f..c9af55e62c2 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -28,6 +28,8 @@ import ( "path/filepath" "time" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" "k8s.io/mount-utils" utilpath "k8s.io/utils/path" @@ -169,7 +171,7 @@ func (rc *reconciler) reconcile() { // attach if kubelet is responsible for attaching volumes. // If underlying PVC was resized while in-use then this function also handles volume // resizing. - rc.mountAttachVolumes() + rc.mountOrAttachVolumes() // Ensure devices that should be detached/unmounted are detached/unmounted. rc.unmountDetachDevices() @@ -193,82 +195,95 @@ func (rc *reconciler) unmountVolumes() { } } -func (rc *reconciler) mountAttachVolumes() { +func (rc *reconciler) mountOrAttachVolumes() { // Ensure volumes that should be attached/mounted are attached/mounted. for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { - volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) + volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.PersistentVolumeSize) volumeToMount.DevicePath = devicePath if cache.IsVolumeNotAttachedError(err) { - if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { - //// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens - if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse { - klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod)) - continue - } - // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait - // for controller to finish attaching volume. - klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) - err := rc.operationExecutor.VerifyControllerAttachedVolume( - volumeToMount.VolumeToMount, - rc.nodeName, - rc.actualStateOfWorld) - if err != nil && !isExpectedError(err) { - klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) - } - if err == nil { - klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) - } - } else { - // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher, - // so attach it - volumeToAttach := operationexecutor.VolumeToAttach{ - VolumeName: volumeToMount.VolumeName, - VolumeSpec: volumeToMount.VolumeSpec, - NodeName: rc.nodeName, - } - klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) - err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld) - if err != nil && !isExpectedError(err) { - klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) - } - if err == nil { - klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) - } - } + rc.waitForVolumeAttach(volumeToMount) } else if !volMounted || cache.IsRemountRequiredError(err) { - // Volume is not mounted, or is already mounted, but requires remounting - remountingLogStr := "" - isRemount := cache.IsRemountRequiredError(err) - if isRemount { - remountingLogStr = "Volume is already mounted to pod, but remount was requested." - } - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) - err := rc.operationExecutor.MountVolume( - rc.waitForAttachTimeout, - volumeToMount.VolumeToMount, - rc.actualStateOfWorld, - isRemount) - if err != nil && !isExpectedError(err) { - klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) - } - if err == nil { - if remountingLogStr == "" { - klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) - } else { - klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) - } - } + rc.mountAttachedVolumes(volumeToMount, err) } else if cache.IsFSResizeRequiredError(err) { - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) - err := rc.operationExecutor.ExpandInUseVolume( - volumeToMount.VolumeToMount, - rc.actualStateOfWorld) - if err != nil && !isExpectedError(err) { - klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod)) - } - if err == nil { - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) - } + fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError) + rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize) + } + } +} + +func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount, currentSize resource.Quantity) { + klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) + err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld, currentSize) + + if err != nil && !isExpectedError(err) { + klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod)) + } + + if err == nil { + klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) + } +} + +func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, podExistError error) { + // Volume is not mounted, or is already mounted, but requires remounting + remountingLogStr := "" + isRemount := cache.IsRemountRequiredError(podExistError) + if isRemount { + remountingLogStr = "Volume is already mounted to pod, but remount was requested." + } + klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) + err := rc.operationExecutor.MountVolume( + rc.waitForAttachTimeout, + volumeToMount.VolumeToMount, + rc.actualStateOfWorld, + isRemount) + if err != nil && !isExpectedError(err) { + klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) + } + if err == nil { + if remountingLogStr == "" { + klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) + } else { + klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) + } + } +} + +func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) { + if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { + //// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens + if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse { + klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod)) + return + } + // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait + // for controller to finish attaching volume. + klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) + err := rc.operationExecutor.VerifyControllerAttachedVolume( + volumeToMount.VolumeToMount, + rc.nodeName, + rc.actualStateOfWorld) + if err != nil && !isExpectedError(err) { + klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) + } + if err == nil { + klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) + } + } else { + // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher, + // so attach it + volumeToAttach := operationexecutor.VolumeToAttach{ + VolumeName: volumeToMount.VolumeName, + VolumeSpec: volumeToMount.VolumeSpec, + NodeName: rc.nodeName, + } + klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) + err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld) + if err != nil && !isExpectedError(err) { + klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) + } + if err == nil { + klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) } } } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index a43110c5341..3b57460527f 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -1161,57 +1161,9 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - pv := &v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: tc.pvName, - UID: "pvuid", - }, - Spec: v1.PersistentVolumeSpec{ - ClaimRef: &v1.ObjectReference{Name: "pvc"}, - VolumeMode: tc.volumeMode, - Capacity: v1.ResourceList{ - v1.ResourceStorage: tc.oldPVSize, - }, - }, - } - pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pvc", - UID: "pvcuid", - }, - Spec: v1.PersistentVolumeClaimSpec{ - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceStorage: tc.pvcSize, - }, - }, - VolumeName: "pv", - VolumeMode: tc.volumeMode, - }, - Status: v1.PersistentVolumeClaimStatus{ - Capacity: v1.ResourceList{ - v1.ResourceStorage: tc.pvcStatusSize, - }, - }, - } - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - UID: "pod1uid", - }, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: "volume-name", - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvc.Name, - }, - }, - }, - }, - }, - } + pv := getTestPV(tc.pvName, tc.volumeMode, tc.oldPVSize) + pvc := getTestPVC("pv", tc.volumeMode, tc.pvcSize, tc.pvcStatusSize) + pod := getTestPod(pvc.Name) // deep copy before reconciler runs to avoid data race. pvWithSize := pv.DeepCopy() @@ -1284,10 +1236,12 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { pvWithSize.Spec.Capacity[v1.ResourceStorage] = tc.newPVSize volumeSpec = &volume.Spec{PersistentVolume: pvWithSize} dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) - // mark volume as resize required - asw.MarkFSResizeRequired(volumeName, podName) - _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName) + t.Logf("Changing size of the volume to %s", tc.newPVSize.String()) + newSize := tc.newPVSize.DeepCopy() + dsw.UpdatePersistentVolumeSize(volumeName, &newSize) + + _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, newSize) if tc.expansionFailed { if cache.IsFSResizeRequiredError(podExistErr) { t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr) @@ -1299,7 +1253,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { go reconciler.Run(wait.NeverStop) waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) { - mounted, _, err := asw.PodExistsInVolume(podName, volumeName) + mounted, _, err := asw.PodExistsInVolume(podName, volumeName, newSize) return mounted && err == nil, nil }) if waitErr != nil { @@ -1311,6 +1265,69 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { } } +func getTestPVC(pvName string, volumeMode *v1.PersistentVolumeMode, specSize, statusSize resource.Quantity) *v1.PersistentVolumeClaim { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc", + UID: "pvcuid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: specSize, + }, + }, + VolumeName: pvName, + VolumeMode: volumeMode, + }, + Status: v1.PersistentVolumeClaimStatus{ + Capacity: v1.ResourceList{ + v1.ResourceStorage: statusSize, + }, + }, + } + return pvc +} + +func getTestPV(pvName string, volumeMode *v1.PersistentVolumeMode, pvSize resource.Quantity) *v1.PersistentVolume { + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvName, + UID: "pvuid", + }, + Spec: v1.PersistentVolumeSpec{ + ClaimRef: &v1.ObjectReference{Name: "pvc"}, + VolumeMode: volumeMode, + Capacity: v1.ResourceList{ + v1.ResourceStorage: pvSize, + }, + }, + } + return pv +} + +func getTestPod(claimName string) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: claimName, + }, + }, + }, + }, + }, + } + return pod +} + func Test_UncertainDeviceGlobalMounts(t *testing.T) { var tests = []struct { name string @@ -1791,7 +1808,7 @@ func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, podN err := retryWithExponentialBackOff( testOperationBackOffDuration, func() (bool, error) { - mounted, _, err := asw.PodExistsInVolume(podName, volumeName) + mounted, _, err := asw.PodExistsInVolume(podName, volumeName, resource.Quantity{}) if mounted || err != nil { return false, nil } diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 47234e99de4..85d5767858d 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -495,7 +495,6 @@ func (spec *Spec) IsKubeletExpandable() bool { return spec.PersistentVolume.Spec.FlexVolume != nil default: return false - } } diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 401a335f4b8..1a5f029982b 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -175,6 +175,7 @@ type FakeVolumePlugin struct { LastProvisionerOptions VolumeOptions NewAttacherCallCount int NewDetacherCallCount int + NodeExpandCallCount int VolumeLimits map[string]int64 VolumeLimitsError error LimitKey string @@ -471,6 +472,7 @@ func (plugin *FakeVolumePlugin) RequiresFSResize() bool { } func (plugin *FakeVolumePlugin) NodeExpand(resizeOptions NodeResizeOptions) (bool, error) { + plugin.NodeExpandCallCount++ if resizeOptions.VolumeSpec.Name() == FailWithInUseVolumeName { return false, volumetypes.NewFailedPreconditionError("volume-in-use") } diff --git a/pkg/volume/util/operationexecutor/fakegenerator.go b/pkg/volume/util/operationexecutor/fakegenerator.go index 6d8e5a1b3d1..88d980e711d 100644 --- a/pkg/volume/util/operationexecutor/fakegenerator.go +++ b/pkg/volume/util/operationexecutor/fakegenerator.go @@ -17,6 +17,7 @@ limitations under the License. package operationexecutor import ( + "k8s.io/apimachinery/pkg/api/resource" "time" v1 "k8s.io/api/core/v1" @@ -108,7 +109,7 @@ func (f *fakeOGCounter) GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeC return f.recordFuncCall("GenerateExpandVolumeFunc"), nil } -func (f *fakeOGCounter) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateExpandInUseVolumeFunc"), nil } diff --git a/pkg/volume/util/operationexecutor/node_expander.go b/pkg/volume/util/operationexecutor/node_expander.go new file mode 100644 index 00000000000..a1beedec65e --- /dev/null +++ b/pkg/volume/util/operationexecutor/node_expander.go @@ -0,0 +1,151 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operationexecutor + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + kevents "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/volume/util" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" +) + +type NodeExpander struct { + nodeResizeOperationOpts + kubeClient clientset.Interface + recorder record.EventRecorder + + // computed via precheck + pvcStatusCap resource.Quantity + pvCap resource.Quantity + resizeStatus *v1.PersistentVolumeClaimResizeStatus + + // pvcAlreadyUpdated if true indicates that although we are calling NodeExpandVolume on the kubelet + // PVC has already been updated - possibly because expansion already succeeded on different node. + // This can happen when a RWX PVC is expanded. + pvcAlreadyUpdated bool +} + +func newNodeExpander(resizeOp nodeResizeOperationOpts, client clientset.Interface, recorder record.EventRecorder) *NodeExpander { + return &NodeExpander{ + kubeClient: client, + nodeResizeOperationOpts: resizeOp, + recorder: recorder, + } +} + +// testResponseData is merely used for doing sanity checks in unit tests +type testResponseData struct { + // indicates that resize operation was called on underlying volume driver + // mainly useful for testing. + resizeCalledOnPlugin bool + + // Indicates whether kubelet should assume resize operation as finished. + // For kubelet - resize operation could be assumed as finished even if + // actual resizing is *not* finished. This can happen, because certain prechecks + // are failing and kubelet should not retry expansion, or it could happen + // because resize operation is genuinely finished. + assumeResizeFinished bool +} + +// runPreCheck performs some sanity checks before expansion can be performed on the PVC. +func (ne *NodeExpander) runPreCheck() bool { + ne.pvcStatusCap = ne.pvc.Status.Capacity[v1.ResourceStorage] + ne.pvCap = ne.pv.Spec.Capacity[v1.ResourceStorage] + + ne.resizeStatus = ne.pvc.Status.ResizeStatus + + // PVC is already expanded but we are still trying to expand the volume because + // last recorded size in ASOW is older. This can happen for RWX volume types. + if ne.pvcStatusCap.Cmp(ne.pluginResizeOpts.NewSize) >= 0 && (ne.resizeStatus == nil || *ne.resizeStatus == v1.PersistentVolumeClaimNoExpansionInProgress) { + ne.pvcAlreadyUpdated = true + } + + // if resizestatus is nil or NodeExpansionInProgress or NodeExpansionPending then we + // should allow volume expansion on the node to proceed. We are making an exception for + // resizeStatus being nil because it will support use cases where + // resizeStatus may not be set (old control-plane expansion controller etc). + if ne.resizeStatus == nil || + ne.pvcAlreadyUpdated || + *ne.resizeStatus == v1.PersistentVolumeClaimNodeExpansionPending || + *ne.resizeStatus == v1.PersistentVolumeClaimNodeExpansionInProgress { + return true + } + + return false +} + +func (ne *NodeExpander) expandOnPlugin() (bool, error, testResponseData) { + allowExpansion := ne.runPreCheck() + if !allowExpansion { + return false, nil, testResponseData{false, true} + } + + var err error + nodeName := ne.vmt.Pod.Spec.NodeName + + if !ne.pvcAlreadyUpdated { + ne.pvc, err = util.MarkNodeExpansionInProgress(ne.pvc, ne.kubeClient) + + if err != nil { + msg := ne.vmt.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err) + klog.Errorf(msg.Error()) + return false, err, testResponseData{} + } + } + _, resizeErr := ne.volumePlugin.NodeExpand(ne.pluginResizeOpts) + if resizeErr != nil { + if volumetypes.IsOperationFinishedError(resizeErr) { + var markFailedError error + ne.pvc, markFailedError = util.MarkNodeExpansionFailed(ne.pvc, ne.kubeClient) + if markFailedError != nil { + klog.Errorf(ne.vmt.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error()) + } + } + + // if driver returned FailedPrecondition error that means + // volume expansion should not be retried on this node but + // expansion operation should not block mounting + if volumetypes.IsFailedPreconditionError(resizeErr) { + ne.actualStateOfWorld.MarkForInUseExpansionError(ne.vmt.VolumeName) + klog.Errorf(ne.vmt.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error()) + return false, nil, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true} + } + return false, resizeErr, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true} + } + simpleMsg, detailedMsg := ne.vmt.GenerateMsg("MountVolume.NodeExpandVolume succeeded", nodeName) + ne.recorder.Eventf(ne.vmt.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) + ne.recorder.Eventf(ne.pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) + klog.InfoS(detailedMsg, "pod", klog.KObj(ne.vmt.Pod)) + + // no need to update PVC object if we already updated it + if ne.pvcAlreadyUpdated { + return true, nil, testResponseData{true, true} + } + + // File system resize succeeded, now update the PVC's Capacity to match the PV's + ne.pvc, err = util.MarkFSResizeFinished(ne.pvc, ne.pluginResizeOpts.NewSize, ne.kubeClient) + if err != nil { + return true, fmt.Errorf("mountVolume.NodeExpandVolume update pvc status failed: %v", err), testResponseData{true, true} + } + return true, nil, testResponseData{true, true} +} diff --git a/pkg/volume/util/operationexecutor/node_expander_test.go b/pkg/volume/util/operationexecutor/node_expander_test.go new file mode 100644 index 00000000000..bf2be4b45b5 --- /dev/null +++ b/pkg/volume/util/operationexecutor/node_expander_test.go @@ -0,0 +1,154 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operationexecutor + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/volume" + volumetesting "k8s.io/kubernetes/pkg/volume/testing" + "testing" +) + +func TestNodeExpander(t *testing.T) { + var tests = []struct { + name string + pvc *v1.PersistentVolumeClaim + pv *v1.PersistentVolume + + // desired size, defaults to pv.Spec.Capacity + desiredSize *resource.Quantity + // actualSize, defaults to pvc.Status.Capacity + actualSize *resource.Quantity + + // expectations of test + expectedResizeStatus v1.PersistentVolumeClaimResizeStatus + expectedStatusSize resource.Quantity + expectResizeCall bool + assumeResizeOpAsFinished bool + expectError bool + }{ + { + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_failed", + pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNodeExpansionFailed), + pv: getTestPV("test-vol0", "2G"), + + expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed, + expectResizeCall: false, + assumeResizeOpAsFinished: true, + expectedStatusSize: resource.MustParse("1G"), + }, + { + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending", + pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending), + pv: getTestPV("test-vol0", "2G"), + expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress, + expectResizeCall: true, + assumeResizeOpAsFinished: true, + expectedStatusSize: resource.MustParse("2G"), + }, + { + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing", + pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending), + pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"), + expectError: true, + expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed, + expectResizeCall: true, + assumeResizeOpAsFinished: true, + expectedStatusSize: resource.MustParse("1G"), + }, + { + name: "pv.spec.cap = pvc.status.cap, resizeStatus='', desiredSize > actualSize", + pvc: getTestPVC("test-vol0", "2G", "2G", "2G", v1.PersistentVolumeClaimNoExpansionInProgress), + pv: getTestPV("test-vol0", "2G"), + + expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress, + expectResizeCall: true, + assumeResizeOpAsFinished: true, + expectedStatusSize: resource.MustParse("2G"), + }, + } + + for i := range tests { + test := tests[i] + t.Run(test.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, true)() + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + + pvc := test.pvc + pv := test.pv + pod := getTestPod("test-pod", pvc.Name) + og := getTestOperationGenerator(volumePluginMgr, pvc, pv) + + vmt := VolumeToMount{ + Pod: pod, + VolumeName: v1.UniqueVolumeName(pv.Name), + VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false), + } + desiredSize := test.desiredSize + if desiredSize == nil { + desiredSize = pv.Spec.Capacity.Storage() + } + actualSize := test.actualSize + if actualSize == nil { + actualSize = pvc.Status.Capacity.Storage() + } + resizeOp := nodeResizeOperationOpts{ + pvc: pvc, + pv: pv, + volumePlugin: fakePlugin, + vmt: vmt, + actualStateOfWorld: nil, + pluginResizeOpts: volume.NodeResizeOptions{ + VolumeSpec: vmt.VolumeSpec, + NewSize: *desiredSize, + OldSize: *actualSize, + }, + } + ogInstance, _ := og.(*operationGenerator) + nodeExpander := newNodeExpander(resizeOp, ogInstance.kubeClient, ogInstance.recorder) + + _, err, expansionResponse := nodeExpander.expandOnPlugin() + + pvc = nodeExpander.pvc + pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] + + if !test.expectError && err != nil { + t.Errorf("For test %s, expected no error got: %v", test.name, err) + } + if test.expectError && err == nil { + t.Errorf("For test %s, expected error but got none", test.name) + } + + if test.expectResizeCall != expansionResponse.resizeCalledOnPlugin { + t.Errorf("For test %s, expected resize called %t, got %t", test.name, test.expectResizeCall, expansionResponse.resizeCalledOnPlugin) + } + if test.assumeResizeOpAsFinished != expansionResponse.assumeResizeFinished { + t.Errorf("For test %s, expected assumeResizeOpAsFinished %t, got %t", test.name, test.assumeResizeOpAsFinished, expansionResponse.assumeResizeFinished) + } + if test.expectedResizeStatus != *pvc.Status.ResizeStatus { + t.Errorf("For test %s, expected resizeStatus %v, got %v", test.name, test.expectedResizeStatus, *pvc.Status.ResizeStatus) + } + if pvcStatusCap.Cmp(test.expectedStatusSize) != 0 { + t.Errorf("For test %s, expected status size %s, got %s", test.name, test.expectedStatusSize.String(), pvcStatusCap.String()) + } + }) + } +} diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 244b63e5c3f..3a88fe052b6 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -148,7 +148,7 @@ type OperationExecutor interface { // and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true IsOperationSafeToRetry(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName, operationName string) bool // ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume. - ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error + ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) error // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, volumePath string, pluginName string) (*volume.Spec, error) // CheckVolumeExistenceOperation checks volume existence @@ -201,7 +201,7 @@ type ActualStateOfWorldMounterUpdater interface { MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error // Marks the specified volume's file system resize request is finished. - MarkVolumeAsResized(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error + MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool // GetDeviceMountState returns mount state of the device in global path GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState @@ -245,6 +245,11 @@ type ActualStateOfWorldAttacherUpdater interface { // Unmarks the desire to detach for the specified volume (add the volume back to // the node's volumesToReportAsAttached list) AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) + + // InitializeClaimSize sets pvc claim size by reading pvc.Status.Capacity + InitializeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) + + GetClaimSize(volumeName v1.UniqueVolumeName) *resource.Quantity } // VolumeLogger defines a set of operations for generating volume-related logging and error msgs @@ -420,6 +425,10 @@ type VolumeToMount struct { // time at which volume was requested to be mounted MountRequestTime time.Time + + // PersistentVolumeSize stores desired size of the volume. + // usually this is the size if pv.Spec.Capacity + PersistentVolumeSize resource.Quantity } // DeviceMountState represents device mount state in a global path. @@ -994,8 +1003,8 @@ func (oe *operationExecutor) UnmountDevice( deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations) } -func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { - generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld) +func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) error { + generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld, currentSize) if err != nil { return err } diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index 521cdf99ac0..59f985de964 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -18,6 +18,7 @@ package operationexecutor import ( "fmt" + "k8s.io/apimachinery/pkg/api/resource" "strconv" "testing" "time" @@ -668,7 +669,7 @@ func (fopg *fakeOperationGenerator) GenerateExpandAndRecoverVolumeFunc(pvc *v1.P }, nil } -func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { +func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) { opFunc := func() volumetypes.OperationContext { startOperationAndBlock(fopg.ch, fopg.quit) return volumetypes.NewOperationContext(nil, nil, false) diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 50fcac3cbf0..898acde5e5c 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -87,6 +87,16 @@ type operationGenerator struct { translator InTreeToCSITranslator } +type inTreeResizeResponse struct { + pvc *v1.PersistentVolumeClaim + pv *v1.PersistentVolume + + err error + // indicates that resize operation was called on underlying volume driver + // mainly useful for testing. + resizeCalled bool +} + // NewOperationGenerator is returns instance of operationGenerator func NewOperationGenerator(kubeClient clientset.Interface, volumePluginMgr *volume.VolumePluginMgr, @@ -150,7 +160,9 @@ type OperationGenerator interface { GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume, string) (volumetypes.GeneratedOperations, error) // Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume. - GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) + // Along with volumeToMount and actualStateOfWorld, the function expects current size of volume on the node as an argument. The current + // size here always refers to capacity last recorded in actualStateOfWorld from pvc.Status.Capacity + GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) } type inTreeResizeOpts struct { @@ -161,27 +173,6 @@ type inTreeResizeOpts struct { volumePlugin volume.ExpandableVolumePlugin } -type inTreeResizeResponse struct { - pvc *v1.PersistentVolumeClaim - pv *v1.PersistentVolume - err error - - // Indicates whether kubelet should assume resize operation as finished. - // For kubelet - resize operation could be assumed as finished even if - // actual resizing is *not* finished. This can happen, because certain prechecks - // are failing and kubelet should not retry expansion, or it could happen - // because resize operation is genuinely finished. - assumeResizeOpAsFinished bool - - // indicates that resize operation was called on underlying volume driver - // mainly useful for testing. - resizeCalled bool - - // indicates whether entire volume expansion is finished or not - // only used from nodeExpansion calls. Mainly used for testing. - resizeFinished bool -} - type nodeResizeOperationOpts struct { vmt VolumeToMount pvc *v1.PersistentVolumeClaim @@ -712,7 +703,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( klog.V(verbosity).InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod)) resizeOptions.DeviceMountPath = volumeMounter.GetPath() - _, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) + _, resizeError = og.expandVolumeDuringMount(volumeToMount, actualStateOfWorld, resizeOptions) if resizeError != nil { klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError) eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError) @@ -1205,7 +1196,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( DevicePath: devicePath, DeviceStagePath: stagingPath, } - _, resizeError := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) + _, resizeError := og.expandVolumeDuringMount(volumeToMount, actualStateOfWorld, resizeOptions) if resizeError != nil { klog.Errorf("MapVolume.NodeExpandVolume failed with %v", resizeError) eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError) @@ -1491,6 +1482,22 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext { migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec) + claimSize := actualStateOfWorld.GetClaimSize(volumeToMount.VolumeName) + + // only fetch claimSize if it was not set previously + if volumeToMount.VolumeSpec.PersistentVolume != nil && claimSize == nil { + pv := volumeToMount.VolumeSpec.PersistentVolume + pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{}) + if err != nil { + eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume fetching pvc failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) + } + pvcStatusSize := pvc.Status.Capacity.Storage() + if pvcStatusSize != nil { + claimSize = pvcStatusSize + } + } + if !volumeToMount.PluginIsAttachable { // If the volume does not implement the attacher interface, it is // assumed to be attached and the actual state of the world is @@ -1503,7 +1510,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr) return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } - + actualStateOfWorld.InitializeClaimSize(volumeToMount.VolumeName, claimSize) return volumetypes.NewOperationContext(nil, nil, migrated) } @@ -1544,6 +1551,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } + actualStateOfWorld.InitializeClaimSize(volumeToMount.VolumeName, claimSize) return volumetypes.NewOperationContext(nil, nil, migrated) } } @@ -1894,7 +1902,7 @@ func (og *operationGenerator) markForPendingNodeExpansion(pvc *v1.PersistentVolu func (og *operationGenerator) GenerateExpandInUseVolumeFunc( volumeToMount VolumeToMount, - actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) { volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) @@ -1907,9 +1915,17 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc( var eventErr, detailedErr error migrated := false + if currentSize.IsZero() || volumeToMount.PersistentVolumeSize.IsZero() { + err := fmt.Errorf("current or new size of the volume is not set") + eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandvolume.expansion failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) + } + resizeOptions := volume.NodeResizeOptions{ VolumeSpec: volumeToMount.VolumeSpec, DevicePath: volumeToMount.DevicePath, + OldSize: currentSize, + NewSize: volumeToMount.PersistentVolumeSize, } fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec) if err != nil { @@ -2011,10 +2027,11 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount, return false, e1, e2 } if resizeDone { - markFSResizedErr := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.PodName, volumeToMount.VolumeName) - if markFSResizedErr != nil { + markingDone := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.VolumeName, &resizeOptions.NewSize) + if !markingDone { // On failure, return error. Caller will log and retry. - e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.MarkVolumeAsResized failed", markFSResizedErr) + genericFailureError := fmt.Errorf("unable to mark volume as resized") + e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.MarkVolumeAsResized failed", genericFailureError) return false, e1, e2 } return true, nil, nil @@ -2022,25 +2039,9 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount, return false, nil, nil } -func (og *operationGenerator) nodeExpandVolume( - volumeToMount VolumeToMount, - actualStateOfWorld ActualStateOfWorldMounterUpdater, - rsOpts volume.NodeResizeOptions) (bool, error) { - - if volumeToMount.VolumeSpec != nil && - volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration { - klog.V(4).Infof("This volume %s is a migrated inline volume and is not resizable", volumeToMount.VolumeName) - return true, nil - } - - // Get expander, if possible - expandableVolumePlugin, _ := - og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec) - - if expandableVolumePlugin != nil && - expandableVolumePlugin.RequiresFSResize() && - volumeToMount.VolumeSpec.PersistentVolume != nil { - +func (og *operationGenerator) expandVolumeDuringMount(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, rsOpts volume.NodeResizeOptions) (bool, error) { + supportsExpansion, expandablePlugin := og.checkIfSupportsNodeExpansion(volumeToMount) + if supportsExpansion { pv := volumeToMount.VolumeSpec.PersistentVolume pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{}) if err != nil { @@ -2055,193 +2056,146 @@ func (og *operationGenerator) nodeExpandVolume( og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) return true, nil } - resizeOp := nodeResizeOperationOpts{ - vmt: volumeToMount, - pvc: pvc, - pv: pv, - pluginResizeOpts: rsOpts, - volumePlugin: expandableVolumePlugin, - actualStateOfWorld: actualStateOfWorld, - } - if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) { - resizeResponse := og.callNodeExpandOnPlugin(resizeOp) - return resizeResponse.assumeResizeOpAsFinished, resizeResponse.err - } else { - return og.legacyCallNodeExpandOnPlugin(resizeOp) + pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] + pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] + if pvcStatusCap.Cmp(pvSpecCap) < 0 { + rsOpts.NewSize = pvSpecCap + rsOpts.OldSize = pvcStatusCap + resizeOp := nodeResizeOperationOpts{ + vmt: volumeToMount, + pvc: pvc, + pv: pv, + pluginResizeOpts: rsOpts, + volumePlugin: expandablePlugin, + actualStateOfWorld: actualStateOfWorld, + } + if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) { + nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder) + resizeFinished, err, _ := nodeExpander.expandOnPlugin() + return resizeFinished, err + } else { + return og.legacyCallNodeExpandOnPlugin(resizeOp) + } } } return true, nil } -// callNodeExpandOnPlugin is newer version of calling node expansion on plugins, which does support -// recovery from volume expansion failure. -func (og *operationGenerator) callNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) inTreeResizeResponse { - pvc := resizeOp.pvc - pv := resizeOp.pv - volumeToMount := resizeOp.vmt - rsOpts := resizeOp.pluginResizeOpts - actualStateOfWorld := resizeOp.actualStateOfWorld - expandableVolumePlugin := resizeOp.volumePlugin - - var err error - pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] - pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] - - resizeResponse := inTreeResizeResponse{ - pvc: pvc, - pv: pv, +func (og *operationGenerator) checkIfSupportsNodeExpansion(volumeToMount VolumeToMount) (bool, volume.NodeExpandableVolumePlugin) { + if volumeToMount.VolumeSpec != nil && + volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration { + klog.V(4).Infof("This volume %s is a migrated inline volume and is not resizable", volumeToMount.VolumeName) + return false, nil } - if permitNodeExpansion(pvc, pv) { - // File system resize was requested, proceed - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod)) - - rsOpts.VolumeSpec = volumeToMount.VolumeSpec - rsOpts.NewSize = pvSpecCap - rsOpts.OldSize = pvcStatusCap - pvc, err = util.MarkNodeExpansionInProgress(pvc, og.kubeClient) - - if err != nil { - msg := volumeToMount.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err) - klog.Errorf(msg.Error()) - resizeResponse.err = msg - return resizeResponse - } - - resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts) - resizeResponse.resizeCalled = true - - if resizeErr != nil { - if volumetypes.IsOperationFinishedError(resizeErr) { - var markFailedError error - pvc, markFailedError = util.MarkNodeExpansionFailed(pvc, og.kubeClient) - // update the pvc with node expansion object - resizeResponse.pvc = pvc - resizeResponse.assumeResizeOpAsFinished = true - if markFailedError != nil { - klog.Errorf(volumeToMount.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error()) - } - } - - // if driver returned FailedPrecondition error that means - // volume expansion should not be retried on this node but - // expansion operation should not block mounting - if volumetypes.IsFailedPreconditionError(resizeErr) { - actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName) - klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error()) - resizeResponse.assumeResizeOpAsFinished = true - return resizeResponse - } - - resizeResponse.err = resizeErr - return resizeResponse - } - resizeResponse.resizeFinished = resizeDone - - // Volume resizing is not done but it did not error out. This could happen if a CSI volume - // does not have node stage_unstage capability but was asked to resize the volume before - // node publish. In which case - we must retry resizing after node publish. - if !resizeDone { - return resizeResponse - } - - simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "") - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) - og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) - klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod)) - - // File system resize succeeded, now update the PVC's Capacity to match the PV's - pvc, err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient) - resizeResponse.pvc = pvc - - if err != nil { - resizeResponse.err = fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err) - // On retry, NodeExpandVolume will be called again but do nothing - return resizeResponse - } - resizeResponse.assumeResizeOpAsFinished = true - return resizeResponse + // Get expander, if possible + expandableVolumePlugin, _ := + og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec) + if expandableVolumePlugin != nil && + expandableVolumePlugin.RequiresFSResize() && + volumeToMount.VolumeSpec.PersistentVolume != nil { + return true, expandableVolumePlugin } - // somehow a resize operation was queued, but we can not perform any resizing because - // prechecks required for node expansion failed. Kubelet should not retry expanding the volume. - resizeResponse.assumeResizeOpAsFinished = true - return resizeResponse + return false, nil +} + +func (og *operationGenerator) nodeExpandVolume( + volumeToMount VolumeToMount, + actualStateOfWorld ActualStateOfWorldMounterUpdater, + rsOpts volume.NodeResizeOptions) (bool, error) { + + supportsExpansion, expandableVolumePlugin := og.checkIfSupportsNodeExpansion(volumeToMount) + + if supportsExpansion { + // lets use sizes handed over to us by caller for comparison + if rsOpts.NewSize.Cmp(rsOpts.OldSize) > 0 { + pv := volumeToMount.VolumeSpec.PersistentVolume + pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{}) + if err != nil { + // Return error rather than leave the file system un-resized, caller will log and retry + return false, fmt.Errorf("mountVolume.NodeExpandVolume get PVC failed : %v", err) + } + + if volumeToMount.VolumeSpec.ReadOnly { + simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system") + klog.Warningf(detailedMsg) + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) + og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) + return true, nil + } + resizeOp := nodeResizeOperationOpts{ + vmt: volumeToMount, + pvc: pvc, + pv: pv, + pluginResizeOpts: rsOpts, + volumePlugin: expandableVolumePlugin, + actualStateOfWorld: actualStateOfWorld, + } + if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) { + nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder) + resizeFinished, err, _ := nodeExpander.expandOnPlugin() + return resizeFinished, err + } else { + return og.legacyCallNodeExpandOnPlugin(resizeOp) + } + } + } + return true, nil } // legacyCallNodeExpandOnPlugin is old version of calling node expansion on plugin, which does not support // recovery from volume expansion failure +// TODO: Removing this code when RecoverVolumeExpansionFailure feature goes GA. func (og *operationGenerator) legacyCallNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) (bool, error) { pvc := resizeOp.pvc - pv := resizeOp.pv volumeToMount := resizeOp.vmt rsOpts := resizeOp.pluginResizeOpts actualStateOfWorld := resizeOp.actualStateOfWorld expandableVolumePlugin := resizeOp.volumePlugin + pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] + + nodeName := volumeToMount.Pod.Spec.NodeName + var err error - pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] - pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] - if pvcStatusCap.Cmp(pvSpecCap) < 0 { - // File system resize was requested, proceed - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod)) + // File system resize was requested, proceed + klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod)) - rsOpts.VolumeSpec = volumeToMount.VolumeSpec - rsOpts.NewSize = pvSpecCap - rsOpts.OldSize = pvcStatusCap - resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts) - if resizeErr != nil { - // if driver returned FailedPrecondition error that means - // volume expansion should not be retried on this node but - // expansion operation should not block mounting - if volumetypes.IsFailedPreconditionError(resizeErr) { - actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName) - klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error()) - return true, nil - } - return false, resizeErr - } - // Volume resizing is not done but it did not error out. This could happen if a CSI volume - // does not have node stage_unstage capability but was asked to resize the volume before - // node publish. In which case - we must retry resizing after node publish. - if !resizeDone { - return false, nil - } - simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "") - og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) - og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) - klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod)) - // File system resize succeeded, now update the PVC's Capacity to match the PV's - _, err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient) - if err != nil { - // On retry, NodeExpandVolume will be called again but do nothing - return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err) + rsOpts.VolumeSpec = volumeToMount.VolumeSpec + + _, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts) + if resizeErr != nil { + // if driver returned FailedPrecondition error that means + // volume expansion should not be retried on this node but + // expansion operation should not block mounting + if volumetypes.IsFailedPreconditionError(resizeErr) { + actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName) + klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed", resizeErr).Error()) + return true, nil } + return false, resizeErr + } + + simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", nodeName) + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) + og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) + klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod)) + + // if PVC already has new size, there is no need to update it. + if pvcStatusCap.Cmp(rsOpts.NewSize) >= 0 { return true, nil } + + // File system resize succeeded, now update the PVC's Capacity to match the PV's + _, err = util.MarkFSResizeFinished(pvc, rsOpts.NewSize, og.kubeClient) + if err != nil { + // On retry, NodeExpandVolume will be called again but do nothing + return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err) + } return true, nil } -func permitNodeExpansion(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool { - pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] - pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] - // if pvc.Status.Cap is >= pv.Spec.Cap then volume is already expanded - if pvcStatusCap.Cmp(pvSpecCap) >= 0 { - return false - } - - resizeStatus := pvc.Status.ResizeStatus - // if resizestatus is nil or NodeExpansionInProgress or NodeExpansionPending then we should allow volume expansion on - // the node to proceed. We are making an exception for resizeStatus being nil because it will support use cases where - // resizeStatus may not be set (old control-plane expansion controller etc). - if resizeStatus == nil || *resizeStatus == v1.PersistentVolumeClaimNodeExpansionPending || *resizeStatus == v1.PersistentVolumeClaimNodeExpansionInProgress { - return true - } else { - klog.Infof("volume %s/%s can not be expanded because resizeStaus is: %s", pvc.Namespace, pvc.Name, *resizeStatus) - return false - } -} - func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error { mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec) diff --git a/pkg/volume/util/operationexecutor/operation_generator_test.go b/pkg/volume/util/operationexecutor/operation_generator_test.go index c86613d54c5..1ae2994deb1 100644 --- a/pkg/volume/util/operationexecutor/operation_generator_test.go +++ b/pkg/volume/util/operationexecutor/operation_generator_test.go @@ -17,9 +17,11 @@ limitations under the License. package operationexecutor import ( + "fmt" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" utilfeature "k8s.io/apiserver/pkg/util/feature" + core "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/features" @@ -209,101 +211,130 @@ func TestOperationGenerator_GenerateExpandAndRecoverVolumeFunc(t *testing.T) { } } -func TestOperationGenerator_callNodeExpansionOnPlugin(t *testing.T) { - var tests = []struct { - name string - pvc *v1.PersistentVolumeClaim - pv *v1.PersistentVolume - recoverFeatureGate bool - - // expectations of test - expectedResizeStatus v1.PersistentVolumeClaimResizeStatus - expectedStatusSize resource.Quantity - expectResizeCall bool - assumeResizeOpAsFinished bool - expectError bool - }{ - { - name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_failed", - pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNodeExpansionFailed), - pv: getTestPV("test-vol0", "2G"), - recoverFeatureGate: true, - - expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed, - expectResizeCall: false, - assumeResizeOpAsFinished: true, - expectedStatusSize: resource.MustParse("1G"), - }, - { - name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending", - pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending), - pv: getTestPV("test-vol0", "2G"), - recoverFeatureGate: true, - expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress, - expectResizeCall: true, - assumeResizeOpAsFinished: true, - expectedStatusSize: resource.MustParse("2G"), - }, - { - name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing", - pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending), - pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"), - recoverFeatureGate: true, - expectError: true, - expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed, - expectResizeCall: true, - assumeResizeOpAsFinished: true, - expectedStatusSize: resource.MustParse("1G"), - }, +func TestOperationGenerator_nodeExpandVolume(t *testing.T) { + getSizeFunc := func(size string) *resource.Quantity { + x := resource.MustParse(size) + return &x } + var tests = []struct { + name string + pvc *v1.PersistentVolumeClaim + pv *v1.PersistentVolume + + // desired size, defaults to pv.Spec.Capacity + desiredSize *resource.Quantity + // actualSize, defaults to pvc.Status.Capacity + actualSize *resource.Quantity + + // expectations of test + expectedResizeStatus v1.PersistentVolumeClaimResizeStatus + expectedStatusSize resource.Quantity + resizeCallCount int + expectError bool + }{ + { + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_failed", + pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNodeExpansionFailed), + pv: getTestPV("test-vol0", "2G"), + + expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed, + resizeCallCount: 0, + expectedStatusSize: resource.MustParse("1G"), + }, + { + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending", + pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending), + pv: getTestPV("test-vol0", "2G"), + expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress, + resizeCallCount: 1, + expectedStatusSize: resource.MustParse("2G"), + }, + { + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing", + pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending), + pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"), + expectError: true, + expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed, + resizeCallCount: 1, + expectedStatusSize: resource.MustParse("1G"), + }, + { + name: "pv.spec.cap = pvc.status.cap, resizeStatus='', desiredSize = actualSize", + pvc: getTestPVC("test-vol0", "2G", "2G", "2G", v1.PersistentVolumeClaimNoExpansionInProgress), + pv: getTestPV("test-vol0", "2G"), + + expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress, + resizeCallCount: 0, + expectedStatusSize: resource.MustParse("2G"), + }, + { + name: "pv.spec.cap = pvc.status.cap, resizeStatus='', desiredSize > actualSize", + pvc: getTestPVC("test-vol0", "2G", "2G", "2G", v1.PersistentVolumeClaimNoExpansionInProgress), + pv: getTestPV("test-vol0", "2G"), + desiredSize: getSizeFunc("2G"), + actualSize: getSizeFunc("1G"), + + expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress, + resizeCallCount: 1, + expectedStatusSize: resource.MustParse("2G"), + }, + { + name: "pv.spec.cap = pvc.status.cap, resizeStatus=node-expansion-failed, desiredSize > actualSize", + pvc: getTestPVC("test-vol0", "2G", "2G", "2G", v1.PersistentVolumeClaimNodeExpansionFailed), + pv: getTestPV("test-vol0", "2G"), + desiredSize: getSizeFunc("2G"), + actualSize: getSizeFunc("1G"), + + expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed, + resizeCallCount: 0, + expectedStatusSize: resource.MustParse("2G"), + }, + } for i := range tests { test := tests[i] t.Run(test.name, func(t *testing.T) { - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, test.recoverFeatureGate)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, true)() volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + test.pv.Spec.ClaimRef = &v1.ObjectReference{ + Namespace: test.pvc.Namespace, + Name: test.pvc.Name, + } pvc := test.pvc pv := test.pv pod := getTestPod("test-pod", pvc.Name) - og := getTestOperationGenerator(volumePluginMgr, pvc, pv) - + og := getTestOperatorGeneratorWithPVPVC(volumePluginMgr, pvc, pv) vmt := VolumeToMount{ Pod: pod, VolumeName: v1.UniqueVolumeName(pv.Name), VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false), } - resizeOp := nodeResizeOperationOpts{ - pvc: pvc, - pv: pv, - volumePlugin: fakePlugin, - vmt: vmt, - actualStateOfWorld: nil, + desiredSize := test.desiredSize + if desiredSize == nil { + desiredSize = pv.Spec.Capacity.Storage() } + actualSize := test.actualSize + if actualSize == nil { + actualSize = pvc.Status.Capacity.Storage() + } + pluginResizeOpts := volume.NodeResizeOptions{ + VolumeSpec: vmt.VolumeSpec, + NewSize: *desiredSize, + OldSize: *actualSize, + } + ogInstance, _ := og.(*operationGenerator) - expansionResponse := ogInstance.callNodeExpandOnPlugin(resizeOp) + _, err := ogInstance.nodeExpandVolume(vmt, nil, pluginResizeOpts) - pvc = expansionResponse.pvc - pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] - - if !test.expectError && expansionResponse.err != nil { - t.Errorf("For test %s, expected no error got: %v", test.name, expansionResponse.err) + if !test.expectError && err != nil { + t.Errorf("For test %s, expected no error got: %v", test.name, err) } - if test.expectError && expansionResponse.err == nil { + if test.expectError && err == nil { t.Errorf("For test %s, expected error but got none", test.name) } - - if test.expectResizeCall != expansionResponse.resizeCalled { - t.Errorf("For test %s, expected resize called %t, got %t", test.name, test.expectResizeCall, expansionResponse.resizeCalled) - } - if test.assumeResizeOpAsFinished != expansionResponse.assumeResizeOpAsFinished { - t.Errorf("For test %s, expected assumeResizeOpAsFinished %t, got %t", test.name, test.assumeResizeOpAsFinished, expansionResponse.assumeResizeOpAsFinished) - } - if test.expectedResizeStatus != *pvc.Status.ResizeStatus { - t.Errorf("For test %s, expected resizeStatus %v, got %v", test.name, test.expectedResizeStatus, *pvc.Status.ResizeStatus) - } - if pvcStatusCap.Cmp(test.expectedStatusSize) != 0 { - t.Errorf("For test %s, expected status size %s, got %s", test.name, test.expectedStatusSize.String(), pvcStatusCap.String()) + if test.resizeCallCount != fakePlugin.NodeExpandCallCount { + t.Errorf("for test %s, expected node-expand call count to be %d, got %d", test.name, test.resizeCallCount, fakePlugin.NodeExpandCallCount) } }) } @@ -353,9 +384,7 @@ func getTestPVC(volumeName string, specSize, statusSize, allocatedSize string, r if len(allocatedSize) > 0 { pvc.Status.AllocatedResources = v1.ResourceList{v1.ResourceStorage: resource.MustParse(allocatedSize)} } - if len(resizeStatus) > 0 { - pvc.Status.ResizeStatus = &resizeStatus - } + pvc.Status.ResizeStatus = &resizeStatus return pvc } @@ -423,6 +452,31 @@ func getTestOperationGenerator(volumePluginMgr *volume.VolumePluginMgr, objects return operationGenerator } +func getTestOperatorGeneratorWithPVPVC(volumePluginMgr *volume.VolumePluginMgr, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) OperationGenerator { + fakeKubeClient := fakeclient.NewSimpleClientset(pvc, pv) + fakeKubeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { + return true, pvc, nil + }) + fakeKubeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) { + return true, pv, nil + }) + fakeKubeClient.AddReactor("patch", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "status" { + return true, pvc, nil + } + return true, nil, fmt.Errorf("no reaction implemented for %s", action) + }) + + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + operationGenerator := NewOperationGenerator( + fakeKubeClient, + volumePluginMgr, + fakeRecorder, + fakeHandler) + return operationGenerator +} + func getTestVolumeToUnmount(pod *v1.Pod, pvSpec v1.PersistentVolumeSpec, pluginName string) MountedVolume { volumeSpec := &volume.Spec{ PersistentVolume: &v1.PersistentVolume{