Do not update PVC if it already has updated size

This commit is contained in:
Hemant Kumar 2022-03-08 16:11:49 -05:00
parent c0fbd83cde
commit 7a43406138
15 changed files with 514 additions and 439 deletions

View File

@ -582,8 +582,9 @@ func (asw *actualStateOfWorld) GetAttachState(
return AttachStateDetached
}
// SetVolumeClaimSize sets size of the volume. But this function should not be used from attach_detach controller.
func (asw *actualStateOfWorld) SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) {
klog.V(5).Infof("doing nothing")
klog.V(5).Infof("NO-OP")
}
func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume {

View File

@ -107,7 +107,7 @@ type ActualStateOfWorld interface {
// volumes, depend on this to update the contents of the volume.
// All volume mounting calls should be idempotent so a second mount call for
// volumes that do not need to update contents should not fail.
PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize *resource.Quantity) (bool, string, error)
PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity) (bool, string, error)
// PodRemovedFromVolume returns true if the given pod does not exist in the list of
// mountedPods for the given volume in the cache, indicating that the pod has
@ -161,11 +161,6 @@ type ActualStateOfWorld interface {
// no longer referenced and may be globally unmounted and detached.
GetUnmountedVolumes() []AttachedVolume
// MarkFSResizeRequired marks each volume that is successfully attached and
// mounted for the specified pod as requiring file system resize (if the plugin for the
// volume indicates it requires file system resize).
MarkFSResizeRequired(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName)
// GetAttachedVolumes returns a list of volumes that is known to be attached
// to the node. This list can be used to determine volumes that are either in-use
// or have a mount/unmount operation pending.
@ -329,10 +324,6 @@ type mountedPod struct {
// volumeGidValue contains the value of the GID annotation, if present.
volumeGidValue string
// fsResizeRequired indicates the underlying volume has been successfully
// mounted to this pod but its size has been expanded after that.
fsResizeRequired bool
// volumeMountStateForPod stores state of volume mount for the pod. if it is:
// - VolumeMounted: means volume for pod has been successfully mounted
// - VolumeMountUncertain: means volume for pod may not be mounted, but it must be unmounted
@ -552,30 +543,17 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M
return nil
}
func (asw *actualStateOfWorld) MarkVolumeAsResized(
podName volumetypes.UniquePodName,
volumeName v1.UniqueVolumeName) error {
func (asw *actualStateOfWorld) MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return fmt.Errorf(
"no volume with the name %q exists in the list of attached volumes",
volumeName)
volumeObj, ok := asw.attachedVolumes[volumeName]
if ok {
volumeObj.persistentVolumeSize = claimSize
asw.attachedVolumes[volumeName] = volumeObj
return true
}
podObj, podExists := volumeObj.mountedPods[podName]
if !podExists {
return fmt.Errorf(
"no pod with the name %q exists in the mounted pods list of volume %s",
podName,
volumeName)
}
klog.V(5).InfoS("Pod volume has been resized", "uniquePodName", podName, "volumeName", volumeName, "outerVolumeSpecName", podObj.outerVolumeSpecName)
podObj.fsResizeRequired = false
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
return nil
return false
}
func (asw *actualStateOfWorld) MarkRemountRequired(
@ -600,40 +578,6 @@ func (asw *actualStateOfWorld) MarkRemountRequired(
}
}
func (asw *actualStateOfWorld) MarkFSResizeRequired(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName) {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
klog.InfoS("MarkFSResizeRequired for volume failed as volume does not exist", "volumeName", volumeName)
return
}
podObj, podExists := volumeObj.mountedPods[podName]
if !podExists {
klog.InfoS("MarkFSResizeRequired for volume failed because the pod does not exist", "uniquePodName", podName, "volumeName", volumeName)
return
}
volumePlugin, err :=
asw.volumePluginMgr.FindNodeExpandablePluginBySpec(podObj.volumeSpec)
if err != nil || volumePlugin == nil {
// Log and continue processing
klog.ErrorS(nil, "MarkFSResizeRequired failed to find expandable plugin for volume", "uniquePodName", podObj.podName, "volumeName", volumeObj.volumeName, "volumeSpecName", podObj.volumeSpec.Name())
return
}
if volumePlugin.RequiresFSResize() {
if !podObj.fsResizeRequired {
klog.V(3).InfoS("PVC volume of the pod requires file system resize", "uniquePodName", podName, "volumeName", volumeName, "outerVolumeSpecName", podObj.outerVolumeSpecName)
podObj.fsResizeRequired = true
}
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
}
}
func (asw *actualStateOfWorld) SetDeviceMountState(
volumeName v1.UniqueVolumeName, deviceMountState operationexecutor.DeviceMountState, devicePath, deviceMountPath string) error {
asw.Lock()
@ -708,10 +652,7 @@ func (asw *actualStateOfWorld) DeleteVolume(volumeName v1.UniqueVolumeName) erro
return nil
}
func (asw *actualStateOfWorld) PodExistsInVolume(
podName volumetypes.UniquePodName,
volumeName v1.UniqueVolumeName,
desiredVolumeSize *resource.Quantity) (bool, string, error) {
func (asw *actualStateOfWorld) PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity) (bool, string, error) {
asw.RLock()
defer asw.RUnlock()
@ -729,36 +670,40 @@ func (asw *actualStateOfWorld) PodExistsInVolume(
if podObj.remountRequired {
return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
}
if asw.volumeNeedsExpansion(volumeObj, desiredVolumeSize) {
return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName)
if currentSize, expandVolume := asw.volumeNeedsExpansion(volumeObj, desiredVolumeSize); expandVolume {
return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName, currentSize)
}
}
return podExists, volumeObj.devicePath, nil
}
func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, desiredVolumeSize *resource.Quantity) bool {
if volumeObj.volumeInUseErrorForExpansion {
return false
func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, desiredVolumeSize resource.Quantity) (resource.Quantity, bool) {
currentSize := resource.Quantity{}
if volumeObj.persistentVolumeSize != nil {
currentSize = volumeObj.persistentVolumeSize.DeepCopy()
}
if volumeObj.persistentVolumeSize == nil || desiredVolumeSize == nil {
return false
if volumeObj.volumeInUseErrorForExpansion {
return currentSize, false
}
if volumeObj.persistentVolumeSize == nil || desiredVolumeSize.IsZero() {
return currentSize, false
}
if desiredVolumeSize.Cmp(*volumeObj.persistentVolumeSize) > 0 {
volumePlugin, err := asw.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeObj.spec)
if err != nil || volumePlugin == nil {
// Log and continue processing
klog.Errorf(
"PodExistsInVolume failed to find expandable plugin volume: %q (volSpecName: %q)",
volumeObj.volumeName, volumeObj.spec.Name())
return false
klog.InfoS("PodExistsInVolume failed to find expandable plugin",
"volume", volumeObj.volumeName,
"volumeSpecName", volumeObj.spec.Name())
return currentSize, false
}
if volumePlugin.RequiresFSResize() {
return true
return currentSize, true
}
}
return false
return currentSize, false
}
func (asw *actualStateOfWorld) PodRemovedFromVolume(
@ -1005,29 +950,31 @@ func newRemountRequiredError(
// fsResizeRequiredError is an error returned when PodExistsInVolume() found
// volume/pod attached/mounted but fsResizeRequired was true, indicating the
// given volume receives an resize request after attached/mounted.
type fsResizeRequiredError struct {
volumeName v1.UniqueVolumeName
podName volumetypes.UniquePodName
type FsResizeRequiredError struct {
CurrentSize resource.Quantity
volumeName v1.UniqueVolumeName
podName volumetypes.UniquePodName
}
func (err fsResizeRequiredError) Error() string {
func (err FsResizeRequiredError) Error() string {
return fmt.Sprintf(
"volumeName %q mounted to %q needs to resize file system",
err.volumeName, err.podName)
}
func newFsResizeRequiredError(
volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) error {
return fsResizeRequiredError{
volumeName: volumeName,
podName: podName,
volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, currentSize resource.Quantity) error {
return FsResizeRequiredError{
CurrentSize: currentSize,
volumeName: volumeName,
podName: podName,
}
}
// IsFSResizeRequiredError returns true if the specified error is a
// fsResizeRequiredError.
func IsFSResizeRequiredError(err error) bool {
_, ok := err.(fsResizeRequiredError)
_, ok := err.(FsResizeRequiredError)
return ok
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache
import (
"k8s.io/apimachinery/pkg/api/resource"
"testing"
"github.com/stretchr/testify/require"
@ -676,7 +677,7 @@ func TestUncertainVolumeMounts(t *testing.T) {
t.Fatalf("expected volume %s to be found in aws.GetPossiblyMountedVolumesForPod", volumeSpec1.Name())
}
volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1, nil)
volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1, resource.Quantity{})
if volExists {
t.Fatalf("expected volume %s to not exist in asw", generatedVolumeName1)
}
@ -762,7 +763,7 @@ func verifyPodExistsInVolumeAsw(
expectedDevicePath string,
asw ActualStateOfWorld) {
podExistsInVolume, devicePath, err :=
asw.PodExistsInVolume(expectedPodName, expectedVolumeName, nil)
asw.PodExistsInVolume(expectedPodName, expectedVolumeName, resource.Quantity{})
if err != nil {
t.Fatalf(
"ASW PodExistsInVolume failed. Expected: <no error> Actual: <%v>", err)
@ -804,7 +805,7 @@ func verifyPodDoesntExistInVolumeAsw(
expectVolumeToExist bool,
asw ActualStateOfWorld) {
podExistsInVolume, devicePath, err :=
asw.PodExistsInVolume(podToCheck, volumeToCheck, nil)
asw.PodExistsInVolume(podToCheck, volumeToCheck, resource.Quantity{})
if !expectVolumeToExist && err == nil {
t.Fatalf(
"ASW PodExistsInVolume did not return error. Expected: <error indicating volume does not exist> Actual: <%v>", err)

View File

@ -135,7 +135,6 @@ type DesiredStateOfWorld interface {
// be mounted to PodName.
type VolumeToMount struct {
operationexecutor.VolumeToMount
PersistentVolumeSize *resource.Quantity
}
// NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld.
@ -435,23 +434,25 @@ func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount {
volumesToMount := make([]VolumeToMount, 0 /* len */, len(dsw.volumesToMount) /* cap */)
for volumeName, volumeObj := range dsw.volumesToMount {
for podName, podObj := range volumeObj.podsToMount {
volumesToMount = append(
volumesToMount,
VolumeToMount{
VolumeToMount: operationexecutor.VolumeToMount{
VolumeName: volumeName,
PodName: podName,
Pod: podObj.pod,
VolumeSpec: podObj.volumeSpec,
PluginIsAttachable: volumeObj.pluginIsAttachable,
PluginIsDeviceMountable: volumeObj.pluginIsDeviceMountable,
OuterVolumeSpecName: podObj.outerVolumeSpecName,
VolumeGidValue: volumeObj.volumeGidValue,
ReportedInUse: volumeObj.reportedInUse,
MountRequestTime: podObj.mountRequestTime,
DesiredSizeLimit: volumeObj.desiredSizeLimit},
PersistentVolumeSize: volumeObj.persistentVolumeSize,
})
vmt := VolumeToMount{
VolumeToMount: operationexecutor.VolumeToMount{
VolumeName: volumeName,
PodName: podName,
Pod: podObj.pod,
VolumeSpec: podObj.volumeSpec,
PluginIsAttachable: volumeObj.pluginIsAttachable,
PluginIsDeviceMountable: volumeObj.pluginIsDeviceMountable,
OuterVolumeSpecName: podObj.outerVolumeSpecName,
VolumeGidValue: volumeObj.volumeGidValue,
ReportedInUse: volumeObj.reportedInUse,
MountRequestTime: podObj.mountRequestTime,
DesiredSizeLimit: volumeObj.desiredSizeLimit,
},
}
if volumeObj.persistentVolumeSize != nil {
vmt.PersistentVolumeSize = volumeObj.persistentVolumeSize.DeepCopy()
}
volumesToMount = append(volumesToMount, vmt)
}
}
return volumesToMount

View File

@ -977,7 +977,7 @@ func TestCheckVolumeFSResize(t *testing.T) {
},
verify: func(t *testing.T, vols []v1.UniqueVolumeName, volName v1.UniqueVolumeName) {
if len(vols) == 0 {
t.Fatalf("Request resize for volume, but volume in ASW hasn't been marked as fsResizeRequired")
t.Fatalf("Requested resize for volume, but volume in ASW hasn't been marked as fsResizeRequired")
}
if len(vols) != 1 {
t.Errorf("Some unexpected volumes are marked as fsResizeRequired: %v", vols)
@ -1053,7 +1053,7 @@ func TestCheckVolumeFSResize(t *testing.T) {
func() {
tc.resize(t, pv, pvc, dswp)
resizeRequiredVolumes := reprocess(dswp, uniquePodName, fakeDSW, fakeASW)
resizeRequiredVolumes := reprocess(dswp, uniquePodName, fakeDSW, fakeASW, *pv.Spec.Capacity.Storage())
tc.verify(t, resizeRequiredVolumes, uniqueVolumeName)
}()
@ -1099,16 +1099,16 @@ func clearASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t *te
}
func reprocess(dswp *desiredStateOfWorldPopulator, uniquePodName types.UniquePodName,
dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName {
dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName {
dswp.ReprocessPod(uniquePodName)
dswp.findAndAddNewPods()
return getResizeRequiredVolumes(dsw, asw)
return getResizeRequiredVolumes(dsw, asw, newSize)
}
func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName {
func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName {
resizeRequiredVolumes := []v1.UniqueVolumeName{}
for _, volumeToMount := range dsw.GetVolumesToMount() {
_, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, nil)
_, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, newSize)
if cache.IsFSResizeRequiredError(err) {
resizeRequiredVolumes = append(resizeRequiredVolumes, volumeToMount.VolumeName)
}

View File

@ -28,6 +28,8 @@ import (
"path/filepath"
"time"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
utilpath "k8s.io/utils/path"
@ -203,14 +205,15 @@ func (rc *reconciler) mountOrAttachVolumes() {
} else if !volMounted || cache.IsRemountRequiredError(err) {
rc.mountAttachedVolumes(volumeToMount, err)
} else if cache.IsFSResizeRequiredError(err) {
rc.expandVolume(volumeToMount)
fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)
rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)
}
}
}
func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount) {
func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount, currentSize resource.Quantity) {
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld)
err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld, currentSize)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod))

View File

@ -1284,10 +1284,11 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
pvWithSize.Spec.Capacity[v1.ResourceStorage] = tc.newPVSize
volumeSpec = &volume.Spec{PersistentVolume: pvWithSize}
dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
// mark volume as resize required
asw.MarkFSResizeRequired(volumeName, podName)
t.Logf("Changing size of the volume to %s", tc.newPVSize.String())
newSize := tc.newPVSize.DeepCopy()
dsw.UpdatePersistentVolumeSize(volumeName, &newSize)
_, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, nil)
_, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, newSize)
if tc.expansionFailed {
if cache.IsFSResizeRequiredError(podExistErr) {
t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr)
@ -1299,7 +1300,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
go reconciler.Run(wait.NeverStop)
waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
mounted, _, err := asw.PodExistsInVolume(podName, volumeName, nil)
mounted, _, err := asw.PodExistsInVolume(podName, volumeName, newSize)
return mounted && err == nil, nil
})
if waitErr != nil {
@ -1791,7 +1792,7 @@ func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, podN
err := retryWithExponentialBackOff(
testOperationBackOffDuration,
func() (bool, error) {
mounted, _, err := asw.PodExistsInVolume(podName, volumeName, nil)
mounted, _, err := asw.PodExistsInVolume(podName, volumeName, resource.Quantity{})
if mounted || err != nil {
return false, nil
}

View File

@ -495,7 +495,6 @@ func (spec *Spec) IsKubeletExpandable() bool {
return spec.PersistentVolume.Spec.FlexVolume != nil
default:
return false
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package operationexecutor
import (
"k8s.io/apimachinery/pkg/api/resource"
"time"
v1 "k8s.io/api/core/v1"
@ -108,7 +109,7 @@ func (f *fakeOGCounter) GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeC
return f.recordFuncCall("GenerateExpandVolumeFunc"), nil
}
func (f *fakeOGCounter) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
func (f *fakeOGCounter) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateExpandInUseVolumeFunc"), nil
}

View File

@ -0,0 +1,149 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package operationexecutor
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
kevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
type NodeExpander struct {
nodeResizeOperationOpts
kubeClient clientset.Interface
recorder record.EventRecorder
// computed via precheck
pvcStatusCap resource.Quantity
pvCap resource.Quantity
resizeStatus *v1.PersistentVolumeClaimResizeStatus
// pvcAlreadyUpdated if true indicates that although we are calling NodeExpandVolume on the kubelet
// PVC has already been updated - possibly because expansion already succeeded on different node.
// This can happen when a RWX PVC is expanded.
pvcAlreadyUpdated bool
}
func newNodeExpander(resizeOp nodeResizeOperationOpts, client clientset.Interface, recorder record.EventRecorder) *NodeExpander {
return &NodeExpander{
kubeClient: client,
nodeResizeOperationOpts: resizeOp,
recorder: recorder,
}
}
// testResponseData is merely used for doing sanity checks in unit tests
type testResponseData struct {
// indicates that resize operation was called on underlying volume driver
// mainly useful for testing.
resizeCalledOnPlugin bool
// Indicates whether kubelet should assume resize operation as finished.
// For kubelet - resize operation could be assumed as finished even if
// actual resizing is *not* finished. This can happen, because certain prechecks
// are failing and kubelet should not retry expansion, or it could happen
// because resize operation is genuinely finished.
assumeResizeFinished bool
}
// runPreCheck performs some sanity checks before expansion can be performed on the PVC.
func (ne *NodeExpander) runPreCheck() bool {
ne.pvcStatusCap = ne.pvc.Status.Capacity[v1.ResourceStorage]
ne.pvCap = ne.pv.Spec.Capacity[v1.ResourceStorage]
ne.resizeStatus = ne.pvc.Status.ResizeStatus
// PVC is already expanded but we are still trying to expand the volume because
// last recorded size in ASOW is older. This can happen for RWX volume types.
if ne.pvcStatusCap.Cmp(ne.pluginResizeOpts.NewSize) >= 0 && (ne.resizeStatus == nil || *ne.resizeStatus == v1.PersistentVolumeClaimNoExpansionInProgress) {
ne.pvcAlreadyUpdated = true
}
// if resizestatus is nil or NodeExpansionInProgress or NodeExpansionPending then we
// should allow volume expansion on the node to proceed. We are making an exception for
// resizeStatus being nil because it will support use cases where
// resizeStatus may not be set (old control-plane expansion controller etc).
if ne.resizeStatus == nil ||
ne.pvcAlreadyUpdated ||
*ne.resizeStatus == v1.PersistentVolumeClaimNodeExpansionPending ||
*ne.resizeStatus == v1.PersistentVolumeClaimNodeExpansionInProgress {
return true
}
return false
}
func (ne *NodeExpander) expandOnPlugin() (bool, error, testResponseData) {
allowExpansion := ne.runPreCheck()
if !allowExpansion {
return false, nil, testResponseData{false, true}
}
var err error
if !ne.pvcAlreadyUpdated {
ne.pvc, err = util.MarkNodeExpansionInProgress(ne.pvc, ne.kubeClient)
if err != nil {
msg := ne.vmt.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err)
klog.Errorf(msg.Error())
return false, err, testResponseData{}
}
}
_, resizeErr := ne.volumePlugin.NodeExpand(ne.pluginResizeOpts)
if resizeErr != nil {
if volumetypes.IsOperationFinishedError(resizeErr) {
var markFailedError error
ne.pvc, markFailedError = util.MarkNodeExpansionFailed(ne.pvc, ne.kubeClient)
if markFailedError != nil {
klog.Errorf(ne.vmt.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error())
}
}
// if driver returned FailedPrecondition error that means
// volume expansion should not be retried on this node but
// expansion operation should not block mounting
if volumetypes.IsFailedPreconditionError(resizeErr) {
ne.actualStateOfWorld.MarkForInUseExpansionError(ne.vmt.VolumeName)
klog.Errorf(ne.vmt.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error())
return false, nil, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true}
}
return false, resizeErr, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true}
}
simpleMsg, detailedMsg := ne.vmt.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "")
ne.recorder.Eventf(ne.vmt.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
ne.recorder.Eventf(ne.pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.InfoS(detailedMsg, "pod", klog.KObj(ne.vmt.Pod))
// no need to update PVC object if we already updated it
if ne.pvcAlreadyUpdated {
return true, nil, testResponseData{true, true}
}
// File system resize succeeded, now update the PVC's Capacity to match the PV's
ne.pvc, err = util.MarkFSResizeFinished(ne.pvc, ne.pluginResizeOpts.NewSize, ne.kubeClient)
if err != nil {
return true, fmt.Errorf("mountVolume.NodeExpandVolume update pvc status failed: %v", err), testResponseData{true, true}
}
return true, nil, testResponseData{true, true}
}

View File

@ -0,0 +1,135 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package operationexecutor
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"testing"
)
func TestNodeExpander(t *testing.T) {
var tests = []struct {
name string
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
recoverFeatureGate bool
// expectations of test
expectedResizeStatus v1.PersistentVolumeClaimResizeStatus
expectedStatusSize resource.Quantity
expectResizeCall bool
assumeResizeOpAsFinished bool
expectError bool
}{
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_failed",
pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNodeExpansionFailed),
pv: getTestPV("test-vol0", "2G"),
recoverFeatureGate: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed,
expectResizeCall: false,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("1G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending",
pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending),
pv: getTestPV("test-vol0", "2G"),
recoverFeatureGate: true,
expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress,
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("2G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing",
pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending),
pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"),
recoverFeatureGate: true,
expectError: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed,
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("1G"),
},
}
for i := range tests {
test := tests[i]
t.Run(test.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, test.recoverFeatureGate)()
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
pvc := test.pvc
pv := test.pv
pod := getTestPod("test-pod", pvc.Name)
og := getTestOperationGenerator(volumePluginMgr, pvc, pv)
vmt := VolumeToMount{
Pod: pod,
VolumeName: v1.UniqueVolumeName(pv.Name),
VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false),
}
resizeOp := nodeResizeOperationOpts{
pvc: pvc,
pv: pv,
volumePlugin: fakePlugin,
vmt: vmt,
actualStateOfWorld: nil,
pluginResizeOpts: volume.NodeResizeOptions{
VolumeSpec: vmt.VolumeSpec,
NewSize: *pv.Spec.Capacity.Storage(),
OldSize: *pvc.Status.Capacity.Storage(),
},
}
ogInstance, _ := og.(*operationGenerator)
nodeExpander := newNodeExpander(resizeOp, ogInstance.kubeClient, ogInstance.recorder)
_, err, expansionResponse := nodeExpander.expandOnPlugin()
pvc = nodeExpander.pvc
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
if !test.expectError && err != nil {
t.Errorf("For test %s, expected no error got: %v", test.name, err)
}
if test.expectError && err == nil {
t.Errorf("For test %s, expected error but got none", test.name)
}
if test.expectResizeCall != expansionResponse.resizeCalledOnPlugin {
t.Errorf("For test %s, expected resize called %t, got %t", test.name, test.expectResizeCall, expansionResponse.resizeCalledOnPlugin)
}
if test.assumeResizeOpAsFinished != expansionResponse.assumeResizeFinished {
t.Errorf("For test %s, expected assumeResizeOpAsFinished %t, got %t", test.name, test.assumeResizeOpAsFinished, expansionResponse.assumeResizeFinished)
}
if test.expectedResizeStatus != *pvc.Status.ResizeStatus {
t.Errorf("For test %s, expected resizeStatus %v, got %v", test.name, test.expectedResizeStatus, *pvc.Status.ResizeStatus)
}
if pvcStatusCap.Cmp(test.expectedStatusSize) != 0 {
t.Errorf("For test %s, expected status size %s, got %s", test.name, test.expectedStatusSize.String(), pvcStatusCap.String())
}
})
}
}

View File

@ -148,7 +148,7 @@ type OperationExecutor interface {
// and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true
IsOperationSafeToRetry(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName, operationName string) bool
// ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume.
ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error
ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) error
// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, volumePath string, pluginName string) (*volume.Spec, error)
// CheckVolumeExistenceOperation checks volume existence
@ -201,7 +201,7 @@ type ActualStateOfWorldMounterUpdater interface {
MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
// Marks the specified volume's file system resize request is finished.
MarkVolumeAsResized(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool
// GetDeviceMountState returns mount state of the device in global path
GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState
@ -423,6 +423,10 @@ type VolumeToMount struct {
// time at which volume was requested to be mounted
MountRequestTime time.Time
// PersistentVolumeSize stores desired size of the volume.
// usually this is the size if pv.Spec.Capacity
PersistentVolumeSize resource.Quantity
}
// DeviceMountState represents device mount state in a global path.
@ -997,8 +1001,8 @@ func (oe *operationExecutor) UnmountDevice(
deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations)
}
func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld)
func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) error {
generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld, currentSize)
if err != nil {
return err
}

View File

@ -18,6 +18,7 @@ package operationexecutor
import (
"fmt"
"k8s.io/apimachinery/pkg/api/resource"
"strconv"
"testing"
"time"
@ -668,7 +669,7 @@ func (fopg *fakeOperationGenerator) GenerateExpandAndRecoverVolumeFunc(pvc *v1.P
}, nil
}
func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return volumetypes.NewOperationContext(nil, nil, false)

View File

@ -87,6 +87,16 @@ type operationGenerator struct {
translator InTreeToCSITranslator
}
type inTreeResizeResponse struct {
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
err error
// indicates that resize operation was called on underlying volume driver
// mainly useful for testing.
resizeCalled bool
}
// NewOperationGenerator is returns instance of operationGenerator
func NewOperationGenerator(kubeClient clientset.Interface,
volumePluginMgr *volume.VolumePluginMgr,
@ -150,7 +160,7 @@ type OperationGenerator interface {
GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume, string) (volumetypes.GeneratedOperations, error)
// Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume.
GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error)
GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error)
}
type inTreeResizeOpts struct {
@ -161,27 +171,6 @@ type inTreeResizeOpts struct {
volumePlugin volume.ExpandableVolumePlugin
}
type inTreeResizeResponse struct {
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
err error
// Indicates whether kubelet should assume resize operation as finished.
// For kubelet - resize operation could be assumed as finished even if
// actual resizing is *not* finished. This can happen, because certain prechecks
// are failing and kubelet should not retry expansion, or it could happen
// because resize operation is genuinely finished.
assumeResizeOpAsFinished bool
// indicates that resize operation was called on underlying volume driver
// mainly useful for testing.
resizeCalled bool
// indicates whether entire volume expansion is finished or not
// only used from nodeExpansion calls. Mainly used for testing.
resizeFinished bool
}
type nodeResizeOperationOpts struct {
vmt VolumeToMount
pvc *v1.PersistentVolumeClaim
@ -712,7 +701,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
klog.V(verbosity).InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
resizeOptions.DeviceMountPath = volumeMounter.GetPath()
_, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
_, resizeError = og.expandVolumeDuringMount(volumeToMount, actualStateOfWorld, resizeOptions)
if resizeError != nil {
klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError)
@ -1205,7 +1194,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
DevicePath: devicePath,
DeviceStagePath: stagingPath,
}
_, resizeError := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
_, resizeError := og.expandVolumeDuringMount(volumeToMount, actualStateOfWorld, resizeOptions)
if resizeError != nil {
klog.Errorf("MapVolume.NodeExpandVolume failed with %v", resizeError)
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError)
@ -1910,7 +1899,7 @@ func (og *operationGenerator) markForPendingNodeExpansion(pvc *v1.PersistentVolu
func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) {
volumePlugin, err :=
og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
@ -1923,9 +1912,17 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
var eventErr, detailedErr error
migrated := false
if currentSize.IsZero() || volumeToMount.PersistentVolumeSize.IsZero() {
err := fmt.Errorf("current or new size of the volume is not set")
eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandvolume.expansion failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
resizeOptions := volume.NodeResizeOptions{
VolumeSpec: volumeToMount.VolumeSpec,
DevicePath: volumeToMount.DevicePath,
OldSize: currentSize,
NewSize: volumeToMount.PersistentVolumeSize,
}
fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
if err != nil {
@ -2027,10 +2024,11 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
return false, e1, e2
}
if resizeDone {
markFSResizedErr := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.PodName, volumeToMount.VolumeName)
if markFSResizedErr != nil {
markingDone := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.VolumeName, &resizeOptions.NewSize)
if !markingDone {
// On failure, return error. Caller will log and retry.
e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.MarkVolumeAsResized failed", markFSResizedErr)
genericFailureError := fmt.Errorf("unable to mark volume as resized")
e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.MarkVolumeAsResized failed", genericFailureError)
return false, e1, e2
}
return true, nil, nil
@ -2038,25 +2036,9 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
return false, nil, nil
}
func (og *operationGenerator) nodeExpandVolume(
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
rsOpts volume.NodeResizeOptions) (bool, error) {
if volumeToMount.VolumeSpec != nil &&
volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration {
klog.V(4).Infof("This volume %s is a migrated inline volume and is not resizable", volumeToMount.VolumeName)
return true, nil
}
// Get expander, if possible
expandableVolumePlugin, _ :=
og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec)
if expandableVolumePlugin != nil &&
expandableVolumePlugin.RequiresFSResize() &&
volumeToMount.VolumeSpec.PersistentVolume != nil {
func (og *operationGenerator) expandVolumeDuringMount(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, rsOpts volume.NodeResizeOptions) (bool, error) {
supportsExpansion, expandablePlugin := og.checkIfSupportsNodeExpansion(volumeToMount)
if supportsExpansion {
pv := volumeToMount.VolumeSpec.PersistentVolume
pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil {
@ -2071,193 +2053,143 @@ func (og *operationGenerator) nodeExpandVolume(
og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
return true, nil
}
resizeOp := nodeResizeOperationOpts{
vmt: volumeToMount,
pvc: pvc,
pv: pv,
pluginResizeOpts: rsOpts,
volumePlugin: expandableVolumePlugin,
actualStateOfWorld: actualStateOfWorld,
}
if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
resizeResponse := og.callNodeExpandOnPlugin(resizeOp)
return resizeResponse.assumeResizeOpAsFinished, resizeResponse.err
} else {
return og.legacyCallNodeExpandOnPlugin(resizeOp)
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
if pvcStatusCap.Cmp(pvSpecCap) < 0 {
rsOpts.NewSize = pvSpecCap
rsOpts.OldSize = pvcStatusCap
resizeOp := nodeResizeOperationOpts{
vmt: volumeToMount,
pvc: pvc,
pv: pv,
pluginResizeOpts: rsOpts,
volumePlugin: expandablePlugin,
actualStateOfWorld: actualStateOfWorld,
}
if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder)
resizeFinished, err, _ := nodeExpander.expandOnPlugin()
return resizeFinished, err
} else {
return og.legacyCallNodeExpandOnPlugin(resizeOp)
}
}
}
return true, nil
}
// callNodeExpandOnPlugin is newer version of calling node expansion on plugins, which does support
// recovery from volume expansion failure.
func (og *operationGenerator) callNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) inTreeResizeResponse {
pvc := resizeOp.pvc
pv := resizeOp.pv
volumeToMount := resizeOp.vmt
rsOpts := resizeOp.pluginResizeOpts
actualStateOfWorld := resizeOp.actualStateOfWorld
expandableVolumePlugin := resizeOp.volumePlugin
var err error
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
resizeResponse := inTreeResizeResponse{
pvc: pvc,
pv: pv,
func (og *operationGenerator) checkIfSupportsNodeExpansion(volumeToMount VolumeToMount) (bool, volume.NodeExpandableVolumePlugin) {
if volumeToMount.VolumeSpec != nil &&
volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration {
klog.V(4).Infof("This volume %s is a migrated inline volume and is not resizable", volumeToMount.VolumeName)
return false, nil
}
if permitNodeExpansion(pvc, pv) {
// File system resize was requested, proceed
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
rsOpts.VolumeSpec = volumeToMount.VolumeSpec
rsOpts.NewSize = pvSpecCap
rsOpts.OldSize = pvcStatusCap
pvc, err = util.MarkNodeExpansionInProgress(pvc, og.kubeClient)
if err != nil {
msg := volumeToMount.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err)
klog.Errorf(msg.Error())
resizeResponse.err = msg
return resizeResponse
}
resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
resizeResponse.resizeCalled = true
if resizeErr != nil {
if volumetypes.IsOperationFinishedError(resizeErr) {
var markFailedError error
pvc, markFailedError = util.MarkNodeExpansionFailed(pvc, og.kubeClient)
// update the pvc with node expansion object
resizeResponse.pvc = pvc
resizeResponse.assumeResizeOpAsFinished = true
if markFailedError != nil {
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error())
}
}
// if driver returned FailedPrecondition error that means
// volume expansion should not be retried on this node but
// expansion operation should not block mounting
if volumetypes.IsFailedPreconditionError(resizeErr) {
actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName)
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error())
resizeResponse.assumeResizeOpAsFinished = true
return resizeResponse
}
resizeResponse.err = resizeErr
return resizeResponse
}
resizeResponse.resizeFinished = resizeDone
// Volume resizing is not done but it did not error out. This could happen if a CSI volume
// does not have node stage_unstage capability but was asked to resize the volume before
// node publish. In which case - we must retry resizing after node publish.
if !resizeDone {
return resizeResponse
}
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "")
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
// File system resize succeeded, now update the PVC's Capacity to match the PV's
pvc, err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient)
resizeResponse.pvc = pvc
if err != nil {
resizeResponse.err = fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err)
// On retry, NodeExpandVolume will be called again but do nothing
return resizeResponse
}
resizeResponse.assumeResizeOpAsFinished = true
return resizeResponse
// Get expander, if possible
expandableVolumePlugin, _ :=
og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec)
if expandableVolumePlugin != nil &&
expandableVolumePlugin.RequiresFSResize() &&
volumeToMount.VolumeSpec.PersistentVolume != nil {
return true, expandableVolumePlugin
}
// somehow a resize operation was queued, but we can not perform any resizing because
// prechecks required for node expansion failed. Kubelet should not retry expanding the volume.
resizeResponse.assumeResizeOpAsFinished = true
return resizeResponse
return false, nil
}
func (og *operationGenerator) nodeExpandVolume(
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
rsOpts volume.NodeResizeOptions) (bool, error) {
supportsExpansion, expandableVolumePlugin := og.checkIfSupportsNodeExpansion(volumeToMount)
if supportsExpansion {
// lets use sizes handed over to us by caller for comparison
if rsOpts.NewSize.Cmp(rsOpts.OldSize) > 0 {
pv := volumeToMount.VolumeSpec.PersistentVolume
pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil {
// Return error rather than leave the file system un-resized, caller will log and retry
return false, fmt.Errorf("mountVolume.NodeExpandVolume get PVC failed : %v", err)
}
if volumeToMount.VolumeSpec.ReadOnly {
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system")
klog.Warningf(detailedMsg)
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
return true, nil
}
resizeOp := nodeResizeOperationOpts{
vmt: volumeToMount,
pvc: pvc,
pv: pv,
pluginResizeOpts: rsOpts,
volumePlugin: expandableVolumePlugin,
actualStateOfWorld: actualStateOfWorld,
}
if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder)
resizeFinished, err, _ := nodeExpander.expandOnPlugin()
return resizeFinished, err
} else {
return og.legacyCallNodeExpandOnPlugin(resizeOp)
}
}
}
return true, nil
}
// legacyCallNodeExpandOnPlugin is old version of calling node expansion on plugin, which does not support
// recovery from volume expansion failure
func (og *operationGenerator) legacyCallNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) (bool, error) {
pvc := resizeOp.pvc
pv := resizeOp.pv
volumeToMount := resizeOp.vmt
rsOpts := resizeOp.pluginResizeOpts
actualStateOfWorld := resizeOp.actualStateOfWorld
expandableVolumePlugin := resizeOp.volumePlugin
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
var err error
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
if pvcStatusCap.Cmp(pvSpecCap) < 0 {
// File system resize was requested, proceed
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
// File system resize was requested, proceed
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
rsOpts.VolumeSpec = volumeToMount.VolumeSpec
rsOpts.NewSize = pvSpecCap
rsOpts.OldSize = pvcStatusCap
resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
if resizeErr != nil {
// if driver returned FailedPrecondition error that means
// volume expansion should not be retried on this node but
// expansion operation should not block mounting
if volumetypes.IsFailedPreconditionError(resizeErr) {
actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName)
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error())
return true, nil
}
return false, resizeErr
}
// Volume resizing is not done but it did not error out. This could happen if a CSI volume
// does not have node stage_unstage capability but was asked to resize the volume before
// node publish. In which case - we must retry resizing after node publish.
if !resizeDone {
return false, nil
}
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "")
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
// File system resize succeeded, now update the PVC's Capacity to match the PV's
_, err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient)
if err != nil {
// On retry, NodeExpandVolume will be called again but do nothing
return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err)
rsOpts.VolumeSpec = volumeToMount.VolumeSpec
_, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
if resizeErr != nil {
// if driver returned FailedPrecondition error that means
// volume expansion should not be retried on this node but
// expansion operation should not block mounting
if volumetypes.IsFailedPreconditionError(resizeErr) {
actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName)
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed", resizeErr).Error())
return true, nil
}
return false, resizeErr
}
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "")
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
// if PVC already has new size, there is no need to update it.
if pvcStatusCap.Cmp(rsOpts.NewSize) >= 0 {
return true, nil
}
// File system resize succeeded, now update the PVC's Capacity to match the PV's
_, err = util.MarkFSResizeFinished(pvc, rsOpts.NewSize, og.kubeClient)
if err != nil {
// On retry, NodeExpandVolume will be called again but do nothing
return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err)
}
return true, nil
}
func permitNodeExpansion(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
// if pvc.Status.Cap is >= pv.Spec.Cap then volume is already expanded
if pvcStatusCap.Cmp(pvSpecCap) >= 0 {
return false
}
resizeStatus := pvc.Status.ResizeStatus
// if resizestatus is nil or NodeExpansionInProgress or NodeExpansionPending then we should allow volume expansion on
// the node to proceed. We are making an exception for resizeStatus being nil because it will support use cases where
// resizeStatus may not be set (old control-plane expansion controller etc).
if resizeStatus == nil || *resizeStatus == v1.PersistentVolumeClaimNodeExpansionPending || *resizeStatus == v1.PersistentVolumeClaimNodeExpansionInProgress {
return true
} else {
klog.Infof("volume %s/%s can not be expanded because resizeStaus is: %s", pvc.Namespace, pvc.Name, *resizeStatus)
return false
}
}
func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error {
mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec)

View File

@ -209,106 +209,6 @@ func TestOperationGenerator_GenerateExpandAndRecoverVolumeFunc(t *testing.T) {
}
}
func TestOperationGenerator_callNodeExpansionOnPlugin(t *testing.T) {
var tests = []struct {
name string
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
recoverFeatureGate bool
// expectations of test
expectedResizeStatus v1.PersistentVolumeClaimResizeStatus
expectedStatusSize resource.Quantity
expectResizeCall bool
assumeResizeOpAsFinished bool
expectError bool
}{
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_failed",
pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNodeExpansionFailed),
pv: getTestPV("test-vol0", "2G"),
recoverFeatureGate: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed,
expectResizeCall: false,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("1G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending",
pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending),
pv: getTestPV("test-vol0", "2G"),
recoverFeatureGate: true,
expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress,
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("2G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing",
pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending),
pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"),
recoverFeatureGate: true,
expectError: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed,
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("1G"),
},
}
for i := range tests {
test := tests[i]
t.Run(test.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, test.recoverFeatureGate)()
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
pvc := test.pvc
pv := test.pv
pod := getTestPod("test-pod", pvc.Name)
og := getTestOperationGenerator(volumePluginMgr, pvc, pv)
vmt := VolumeToMount{
Pod: pod,
VolumeName: v1.UniqueVolumeName(pv.Name),
VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false),
}
resizeOp := nodeResizeOperationOpts{
pvc: pvc,
pv: pv,
volumePlugin: fakePlugin,
vmt: vmt,
actualStateOfWorld: nil,
}
ogInstance, _ := og.(*operationGenerator)
expansionResponse := ogInstance.callNodeExpandOnPlugin(resizeOp)
pvc = expansionResponse.pvc
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
if !test.expectError && expansionResponse.err != nil {
t.Errorf("For test %s, expected no error got: %v", test.name, expansionResponse.err)
}
if test.expectError && expansionResponse.err == nil {
t.Errorf("For test %s, expected error but got none", test.name)
}
if test.expectResizeCall != expansionResponse.resizeCalled {
t.Errorf("For test %s, expected resize called %t, got %t", test.name, test.expectResizeCall, expansionResponse.resizeCalled)
}
if test.assumeResizeOpAsFinished != expansionResponse.assumeResizeOpAsFinished {
t.Errorf("For test %s, expected assumeResizeOpAsFinished %t, got %t", test.name, test.assumeResizeOpAsFinished, expansionResponse.assumeResizeOpAsFinished)
}
if test.expectedResizeStatus != *pvc.Status.ResizeStatus {
t.Errorf("For test %s, expected resizeStatus %v, got %v", test.name, test.expectedResizeStatus, *pvc.Status.ResizeStatus)
}
if pvcStatusCap.Cmp(test.expectedStatusSize) != 0 {
t.Errorf("For test %s, expected status size %s, got %s", test.name, test.expectedStatusSize.String(), pvcStatusCap.String())
}
})
}
}
func getTestPod(podName, pvcName string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{