diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 1be17a4d756..d04385d259d 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -265,6 +265,10 @@ type attachedVolume struct { // deviceMountPath contains the path on the node where the device should // be mounted after it is attached. deviceMountPath string + + // volumeInUseErrorForExpansion indicates volume driver has previously returned volume-in-use error + // for this volume and volume expansion on this node should not be retried + volumeInUseErrorForExpansion bool } // The mountedPod object represents a pod for which the kubelet volume manager @@ -381,6 +385,17 @@ func (asw *actualStateOfWorld) GetDeviceMountState(volumeName v1.UniqueVolumeNam return volumeObj.deviceMountState } +func (asw *actualStateOfWorld) MarkForInUseExpansionError(volumeName v1.UniqueVolumeName) { + asw.Lock() + defer asw.Unlock() + + volumeObj, ok := asw.attachedVolumes[volumeName] + if ok { + volumeObj.volumeInUseErrorForExpansion = true + asw.attachedVolumes[volumeName] = volumeObj + } +} + func (asw *actualStateOfWorld) GetVolumeMountState(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) operationexecutor.VolumeMountState { asw.RLock() defer asw.RUnlock() @@ -672,6 +687,7 @@ func (asw *actualStateOfWorld) PodExistsInVolume( return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName) } if podObj.fsResizeRequired && + !volumeObj.volumeInUseErrorForExpansion && utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) { return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName) } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index fc2401c0559..8f219323d30 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -1001,16 +1001,42 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { fsMode := v1.PersistentVolumeFilesystem var tests = []struct { - name string - volumeMode *v1.PersistentVolumeMode + name string + volumeMode *v1.PersistentVolumeMode + expansionFailed bool + pvName string + pvcSize resource.Quantity + pvcStatusSize resource.Quantity + oldPVSize resource.Quantity + newPVSize resource.Quantity }{ { - name: "expand-fs-volume", - volumeMode: &fsMode, + name: "expand-fs-volume", + volumeMode: &fsMode, + pvName: "pv", + pvcSize: resource.MustParse("10G"), + pvcStatusSize: resource.MustParse("10G"), + newPVSize: resource.MustParse("15G"), + oldPVSize: resource.MustParse("10G"), }, { - name: "expand-raw-block", - volumeMode: &blockMode, + name: "expand-raw-block", + volumeMode: &blockMode, + pvName: "pv", + pvcSize: resource.MustParse("10G"), + pvcStatusSize: resource.MustParse("10G"), + newPVSize: resource.MustParse("15G"), + oldPVSize: resource.MustParse("10G"), + }, + { + name: "expand-fs-volume with in-use error", + volumeMode: &fsMode, + expansionFailed: true, + pvName: volumetesting.FailWithInUseVolumeName, + pvcSize: resource.MustParse("10G"), + pvcStatusSize: resource.MustParse("10G"), + newPVSize: resource.MustParse("15G"), + oldPVSize: resource.MustParse("13G"), }, } @@ -1018,12 +1044,15 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { t.Run(tc.name, func(t *testing.T) { pv := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ - Name: "pv", + Name: tc.pvName, UID: "pvuid", }, Spec: v1.PersistentVolumeSpec{ ClaimRef: &v1.ObjectReference{Name: "pvc"}, VolumeMode: tc.volumeMode, + Capacity: v1.ResourceList{ + v1.ResourceStorage: tc.oldPVSize, + }, }, } pvc := &v1.PersistentVolumeClaim{ @@ -1032,9 +1061,19 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { UID: "pvcuid", }, Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: tc.pvcSize, + }, + }, VolumeName: "pv", VolumeMode: tc.volumeMode, }, + Status: v1.PersistentVolumeClaimStatus{ + Capacity: v1.ResourceList{ + v1.ResourceStorage: tc.pvcStatusSize, + }, + }, } pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -1058,7 +1097,10 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) - kubeClient := createtestClientWithPVPVC(pv, pvc) + kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ + Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.pvName)), + DevicePath: "fake/path", + }) fakeRecorder := &record.FakeRecorder{} fakeHandler := volumetesting.NewBlockVolumePathHandler() oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( @@ -1104,24 +1146,36 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { close(stopChan) <-stoppedChan - // Mark volume as fsResizeRequired. + // Simulate what DSOWP does + pv.Spec.Capacity[v1.ResourceStorage] = tc.newPVSize + volumeSpec = &volume.Spec{PersistentVolume: pv} + dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + // mark volume as resize required asw.MarkFSResizeRequired(volumeName, podName) + _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName) - if !cache.IsFSResizeRequiredError(podExistErr) { - t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr) + if tc.expansionFailed { + if cache.IsFSResizeRequiredError(podExistErr) { + t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr) + } + } else { + if !cache.IsFSResizeRequiredError(podExistErr) { + t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr) + } + + // Start the reconciler again, we hope reconciler will perform the + // resize operation and clear the fsResizeRequired flag for volume. + go reconciler.Run(wait.NeverStop) + + waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) { + mounted, _, err := asw.PodExistsInVolume(podName, volumeName) + return mounted && err == nil, nil + }) + if waitErr != nil { + t.Fatal("Volume resize should succeeded") + } } - // Start the reconciler again, we hope reconciler will perform the - // resize operation and clear the fsResizeRequired flag for volume. - go reconciler.Run(wait.NeverStop) - - waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) { - mounted, _, err := asw.PodExistsInVolume(podName, volumeName) - return mounted && err == nil, nil - }) - if waitErr != nil { - t.Fatal("Volume resize should succeeded") - } }) } } @@ -1653,6 +1707,12 @@ func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolume fakeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) { return true, pv, nil }) + fakeClient.AddReactor("patch", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "status" { + return true, pvc, nil + } + return true, nil, fmt.Errorf("no reaction implemented for %s", action) + }) fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { return true, nil, fmt.Errorf("no reaction implemented for %s", action) }) diff --git a/pkg/volume/csi/expander.go b/pkg/volume/csi/expander.go index aac2e31e059..bfa6d714716 100644 --- a/pkg/volume/csi/expander.go +++ b/pkg/volume/csi/expander.go @@ -21,12 +21,15 @@ import ( "errors" "fmt" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" api "k8s.io/api/core/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) var _ volume.NodeExpandableVolumePlugin = &csiPlugin{} @@ -123,7 +126,26 @@ func (c *csiPlugin) nodeExpandWithClient( _, err = csClient.NodeExpandVolume(ctx, opts) if err != nil { - return false, fmt.Errorf("Expander.NodeExpand failed to expand the volume : %v", err) + if inUseError(err) { + failedConditionErr := fmt.Errorf("Expander.NodeExpand failed to expand the volume : %w", volumetypes.NewFailedPreconditionError(err.Error())) + return false, failedConditionErr + } + return false, fmt.Errorf("Expander.NodeExpand failed to expand the volume : %w", err) } return true, nil } + +func inUseError(err error) bool { + st, ok := status.FromError(err) + if !ok { + // not a grpc error + return false + } + // if this is a failed precondition error then that means driver does not support expansion + // of in-use volumes + // More info - https://github.com/container-storage-interface/spec/blob/master/spec.md#controllerexpandvolume-errors + if st.Code() == codes.FailedPrecondition { + return true + } + return false +} diff --git a/pkg/volume/csi/expander_test.go b/pkg/volume/csi/expander_test.go index b53afb1ae45..12c10f5a892 100644 --- a/pkg/volume/csi/expander_test.go +++ b/pkg/volume/csi/expander_test.go @@ -20,19 +20,24 @@ import ( "os" "testing" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) func TestNodeExpand(t *testing.T) { tests := []struct { - name string - nodeExpansion bool - nodeStageSet bool - volumePhase volume.CSIVolumePhaseType - success bool - fsVolume bool - deviceStagePath string + name string + nodeExpansion bool + nodeStageSet bool + volumePhase volume.CSIVolumePhaseType + success bool + fsVolume bool + grpcError error + hasVolumeInUseError bool + deviceStagePath string }{ { name: "when node expansion is not set", @@ -76,6 +81,26 @@ func TestNodeExpand(t *testing.T) { success: true, fsVolume: false, }, + { + name: "when nodeExpansion=on, nodeStage=on, volumePhase=published has grpc volume-in-use error", + nodeExpansion: true, + nodeStageSet: true, + volumePhase: volume.CSIVolumePublished, + success: false, + fsVolume: true, + grpcError: status.Error(codes.FailedPrecondition, "volume-in-use"), + hasVolumeInUseError: true, + }, + { + name: "when nodeExpansion=on, nodeStage=on, volumePhase=published has other grpc error", + nodeExpansion: true, + nodeStageSet: true, + volumePhase: volume.CSIVolumePublished, + success: false, + fsVolume: true, + grpcError: status.Error(codes.InvalidArgument, "invalid-argument"), + hasVolumeInUseError: false, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { @@ -99,8 +124,19 @@ func TestNodeExpand(t *testing.T) { fakeCSIClient, _ := csClient.(*fakeCsiDriverClient) fakeNodeClient := fakeCSIClient.nodeClient + + if tc.grpcError != nil { + fakeNodeClient.SetNextError(tc.grpcError) + } + ok, err := plug.nodeExpandWithClient(resizeOptions, csiSource, csClient, tc.fsVolume) + if tc.hasVolumeInUseError { + if !volumetypes.IsFailedPreconditionError(err) { + t.Errorf("expected failed precondition error got: %v", err) + } + } + // verify device staging targer path stagingTargetPath := fakeNodeClient.FakeNodeExpansionRequest.GetStagingTargetPath() if tc.deviceStagePath != "" && tc.deviceStagePath != stagingTargetPath { diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index c481f46a102..8bae17af2f3 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -89,6 +89,9 @@ const ( // SuccessAndFailOnMountDeviceName will cause first mount operation to succeed but subsequent attempts to fail SuccessAndFailOnMountDeviceName = "success-and-failed-mount-device-name" + // FailWithInUseVolumeName will cause NodeExpandVolume to result in FailedPrecondition error + FailWithInUseVolumeName = "fail-expansion-in-use" + deviceNotMounted = "deviceNotMounted" deviceMountUncertain = "deviceMountUncertain" deviceMounted = "deviceMounted" @@ -658,6 +661,9 @@ func (plugin *FakeVolumePlugin) RequiresFSResize() bool { } func (plugin *FakeVolumePlugin) NodeExpand(resizeOptions NodeResizeOptions) (bool, error) { + if resizeOptions.VolumeSpec.Name() == FailWithInUseVolumeName { + return false, volumetypes.NewFailedPreconditionError("volume-in-use") + } return true, nil } diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 246fb404f3d..e4c287381d4 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -202,6 +202,10 @@ type ActualStateOfWorldMounterUpdater interface { // GetVolumeMountState returns mount state of the volume for the Pod GetVolumeMountState(volumName v1.UniqueVolumeName, podName volumetypes.UniquePodName) VolumeMountState + + // MarkForInUseExpansionError marks the volume to have in-use error during expansion. + // volume expansion must not be retried for this volume + MarkForInUseExpansionError(volumeName v1.UniqueVolumeName) } // ActualStateOfWorldAttacherUpdater defines a set of operations updating the diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 7756578b808..52052e0748f 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -613,7 +613,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( // NodeExpandVolume will resize the file system if user has requested a resize of // underlying persistent volume and is allowed to do so. - resizeDone, resizeError = og.nodeExpandVolume(volumeToMount, resizeOptions) + resizeDone, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) if resizeError != nil { klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError) @@ -669,7 +669,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( // - Volume does not support DeviceMounter interface. // - In case of CSI the volume does not have node stage_unstage capability. if !resizeDone { - _, resizeError = og.nodeExpandVolume(volumeToMount, resizeOptions) + _, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) if resizeError != nil { klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError) return volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError) @@ -1083,7 +1083,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc( DeviceStagePath: stagingPath, CSIVolumePhase: volume.CSIVolumePublished, } - _, resizeError := og.nodeExpandVolume(volumeToMount, resizeOptions) + _, resizeError := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) if resizeError != nil { klog.Errorf("MapVolume.NodeExpandVolume failed with %v", resizeError) return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError) @@ -1587,10 +1587,10 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, resizeOptions volume.NodeResizeOptions) (bool, error, error) { - resizeDone, err := og.nodeExpandVolume(volumeToMount, resizeOptions) + resizeDone, err := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) if err != nil { - klog.Errorf("NodeExpandVolume.NodeExpandVolume failed : %v", err) e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed", err) + klog.Errorf(e2.Error()) return false, e1, e2 } if resizeDone { @@ -1605,7 +1605,10 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount, return false, nil, nil } -func (og *operationGenerator) nodeExpandVolume(volumeToMount VolumeToMount, rsOpts volume.NodeResizeOptions) (bool, error) { +func (og *operationGenerator) nodeExpandVolume( + volumeToMount VolumeToMount, + actualStateOfWorld ActualStateOfWorldMounterUpdater, + rsOpts volume.NodeResizeOptions) (bool, error) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) { klog.V(4).Infof("Resizing is not enabled for this volume %s", volumeToMount.VolumeName) return true, nil @@ -1649,7 +1652,15 @@ func (og *operationGenerator) nodeExpandVolume(volumeToMount VolumeToMount, rsOp rsOpts.OldSize = pvcStatusCap resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts) if resizeErr != nil { - return false, fmt.Errorf("MountVolume.NodeExpandVolume failed : %v", resizeErr) + // if driver returned FailedPrecondition error that means + // volume expansion should not be retried on this node but + // expansion operation should not block mounting + if volumetypes.IsFailedPreconditionError(resizeErr) { + actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName) + klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error()) + return true, nil + } + return false, resizeErr } // Volume resizing is not done but it did not error out. This could happen if a CSI volume // does not have node stage_unstage capability but was asked to resize the volume before diff --git a/pkg/volume/util/types/types.go b/pkg/volume/util/types/types.go index 8dab5fcfa83..a06e5e18295 100644 --- a/pkg/volume/util/types/types.go +++ b/pkg/volume/util/types/types.go @@ -54,6 +54,28 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) { return o.OperationFunc() } +// FailedPrecondition error indicates CSI operation returned failed precondition +// error +type FailedPrecondition struct { + msg string +} + +func (err *FailedPrecondition) Error() string { + return err.msg +} + +// NewFailedPreconditionError returns a new FailedPrecondition error instance +func NewFailedPreconditionError(msg string) *FailedPrecondition { + return &FailedPrecondition{msg: msg} +} + +// IsFailedPreconditionError checks if given error is of type that indicates +// operation failed with precondition +func IsFailedPreconditionError(err error) bool { + var failedPreconditionError *FailedPrecondition + return errors.As(err, &failedPreconditionError) +} + // TransientOperationFailure indicates operation failed with a transient error // and may fix itself when retried. type TransientOperationFailure struct { diff --git a/pkg/volume/util/types/types_test.go b/pkg/volume/util/types/types_test.go index 3ff4371be3c..7a3eebb38ae 100644 --- a/pkg/volume/util/types/types_test.go +++ b/pkg/volume/util/types/types_test.go @@ -23,28 +23,55 @@ import ( "k8s.io/utils/mount" ) -func TestIsFilesystemMismatchError(t *testing.T) { +func TestErrorTypes(t *testing.T) { tests := []struct { - mountError error - expectError bool + name string + realError error + errorCheckFunc func(error) bool + expectError bool }{ { + "when mount error has File system mismatch errors", mount.NewMountError(mount.FilesystemMismatch, "filesystem mismatch"), + IsFilesystemMismatchError, true, }, { + "when mount error has other error", mount.NewMountError(mount.FormatFailed, "filesystem mismatch"), + IsFilesystemMismatchError, false, }, { + "when mount error wraps filesystem mismatch error", fmt.Errorf("mount failed %w", mount.NewMountError(mount.FilesystemMismatch, "filesystem mismatch")), + IsFilesystemMismatchError, + true, + }, + { + "when error has no failedPrecondition error", + fmt.Errorf("some other error"), + IsFailedPreconditionError, + false, + }, + { + "when error has failedPrecondition error", + NewFailedPreconditionError("volume-in-use"), + IsFailedPreconditionError, + true, + }, + { + "when error wraps failedPrecondition error", + fmt.Errorf("volume readonly %w", NewFailedPreconditionError("volume-in-use-error")), + IsFailedPreconditionError, true, }, } + for _, test := range tests { - ok := IsFilesystemMismatchError(test.mountError) + ok := test.errorCheckFunc(test.realError) if ok != test.expectError { - t.Errorf("expected filesystem mismatch to be %v but got %v", test.expectError, ok) + t.Errorf("for %s: expected error to be %v but got %v", test.name, test.expectError, ok) } } }