Merge pull request #92986 from gnufied/fix-in-use-errors

Handle volume-in-use error during volume expansion
This commit is contained in:
Kubernetes Prow Robot 2020-07-15 00:30:37 -07:00 committed by GitHub
commit d9c3d15018
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 246 additions and 42 deletions

View File

@ -265,6 +265,10 @@ type attachedVolume struct {
// deviceMountPath contains the path on the node where the device should
// be mounted after it is attached.
deviceMountPath string
// volumeInUseErrorForExpansion indicates volume driver has previously returned volume-in-use error
// for this volume and volume expansion on this node should not be retried
volumeInUseErrorForExpansion bool
}
// The mountedPod object represents a pod for which the kubelet volume manager
@ -381,6 +385,17 @@ func (asw *actualStateOfWorld) GetDeviceMountState(volumeName v1.UniqueVolumeNam
return volumeObj.deviceMountState
}
func (asw *actualStateOfWorld) MarkForInUseExpansionError(volumeName v1.UniqueVolumeName) {
asw.Lock()
defer asw.Unlock()
volumeObj, ok := asw.attachedVolumes[volumeName]
if ok {
volumeObj.volumeInUseErrorForExpansion = true
asw.attachedVolumes[volumeName] = volumeObj
}
}
func (asw *actualStateOfWorld) GetVolumeMountState(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) operationexecutor.VolumeMountState {
asw.RLock()
defer asw.RUnlock()
@ -672,6 +687,7 @@ func (asw *actualStateOfWorld) PodExistsInVolume(
return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
}
if podObj.fsResizeRequired &&
!volumeObj.volumeInUseErrorForExpansion &&
utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName)
}

View File

@ -1001,16 +1001,42 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
fsMode := v1.PersistentVolumeFilesystem
var tests = []struct {
name string
volumeMode *v1.PersistentVolumeMode
name string
volumeMode *v1.PersistentVolumeMode
expansionFailed bool
pvName string
pvcSize resource.Quantity
pvcStatusSize resource.Quantity
oldPVSize resource.Quantity
newPVSize resource.Quantity
}{
{
name: "expand-fs-volume",
volumeMode: &fsMode,
name: "expand-fs-volume",
volumeMode: &fsMode,
pvName: "pv",
pvcSize: resource.MustParse("10G"),
pvcStatusSize: resource.MustParse("10G"),
newPVSize: resource.MustParse("15G"),
oldPVSize: resource.MustParse("10G"),
},
{
name: "expand-raw-block",
volumeMode: &blockMode,
name: "expand-raw-block",
volumeMode: &blockMode,
pvName: "pv",
pvcSize: resource.MustParse("10G"),
pvcStatusSize: resource.MustParse("10G"),
newPVSize: resource.MustParse("15G"),
oldPVSize: resource.MustParse("10G"),
},
{
name: "expand-fs-volume with in-use error",
volumeMode: &fsMode,
expansionFailed: true,
pvName: volumetesting.FailWithInUseVolumeName,
pvcSize: resource.MustParse("10G"),
pvcStatusSize: resource.MustParse("10G"),
newPVSize: resource.MustParse("15G"),
oldPVSize: resource.MustParse("13G"),
},
}
@ -1018,12 +1044,15 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "pv",
Name: tc.pvName,
UID: "pvuid",
},
Spec: v1.PersistentVolumeSpec{
ClaimRef: &v1.ObjectReference{Name: "pvc"},
VolumeMode: tc.volumeMode,
Capacity: v1.ResourceList{
v1.ResourceStorage: tc.oldPVSize,
},
},
}
pvc := &v1.PersistentVolumeClaim{
@ -1032,9 +1061,19 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
UID: "pvcuid",
},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: tc.pvcSize,
},
},
VolumeName: "pv",
VolumeMode: tc.volumeMode,
},
Status: v1.PersistentVolumeClaimStatus{
Capacity: v1.ResourceList{
v1.ResourceStorage: tc.pvcStatusSize,
},
},
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -1058,7 +1097,10 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createtestClientWithPVPVC(pv, pvc)
kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.pvName)),
DevicePath: "fake/path",
})
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
@ -1104,24 +1146,36 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
close(stopChan)
<-stoppedChan
// Mark volume as fsResizeRequired.
// Simulate what DSOWP does
pv.Spec.Capacity[v1.ResourceStorage] = tc.newPVSize
volumeSpec = &volume.Spec{PersistentVolume: pv}
dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
// mark volume as resize required
asw.MarkFSResizeRequired(volumeName, podName)
_, _, podExistErr := asw.PodExistsInVolume(podName, volumeName)
if !cache.IsFSResizeRequiredError(podExistErr) {
t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr)
if tc.expansionFailed {
if cache.IsFSResizeRequiredError(podExistErr) {
t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr)
}
} else {
if !cache.IsFSResizeRequiredError(podExistErr) {
t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr)
}
// Start the reconciler again, we hope reconciler will perform the
// resize operation and clear the fsResizeRequired flag for volume.
go reconciler.Run(wait.NeverStop)
waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
mounted, _, err := asw.PodExistsInVolume(podName, volumeName)
return mounted && err == nil, nil
})
if waitErr != nil {
t.Fatal("Volume resize should succeeded")
}
}
// Start the reconciler again, we hope reconciler will perform the
// resize operation and clear the fsResizeRequired flag for volume.
go reconciler.Run(wait.NeverStop)
waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
mounted, _, err := asw.PodExistsInVolume(podName, volumeName)
return mounted && err == nil, nil
})
if waitErr != nil {
t.Fatal("Volume resize should succeeded")
}
})
}
}
@ -1653,6 +1707,12 @@ func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolume
fakeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) {
return true, pv, nil
})
fakeClient.AddReactor("patch", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
if action.GetSubresource() == "status" {
return true, pvc, nil
}
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
})
fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
})

View File

@ -21,12 +21,15 @@ import (
"errors"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
api "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
var _ volume.NodeExpandableVolumePlugin = &csiPlugin{}
@ -123,7 +126,26 @@ func (c *csiPlugin) nodeExpandWithClient(
_, err = csClient.NodeExpandVolume(ctx, opts)
if err != nil {
return false, fmt.Errorf("Expander.NodeExpand failed to expand the volume : %v", err)
if inUseError(err) {
failedConditionErr := fmt.Errorf("Expander.NodeExpand failed to expand the volume : %w", volumetypes.NewFailedPreconditionError(err.Error()))
return false, failedConditionErr
}
return false, fmt.Errorf("Expander.NodeExpand failed to expand the volume : %w", err)
}
return true, nil
}
func inUseError(err error) bool {
st, ok := status.FromError(err)
if !ok {
// not a grpc error
return false
}
// if this is a failed precondition error then that means driver does not support expansion
// of in-use volumes
// More info - https://github.com/container-storage-interface/spec/blob/master/spec.md#controllerexpandvolume-errors
if st.Code() == codes.FailedPrecondition {
return true
}
return false
}

View File

@ -20,19 +20,24 @@ import (
"os"
"testing"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
func TestNodeExpand(t *testing.T) {
tests := []struct {
name string
nodeExpansion bool
nodeStageSet bool
volumePhase volume.CSIVolumePhaseType
success bool
fsVolume bool
deviceStagePath string
name string
nodeExpansion bool
nodeStageSet bool
volumePhase volume.CSIVolumePhaseType
success bool
fsVolume bool
grpcError error
hasVolumeInUseError bool
deviceStagePath string
}{
{
name: "when node expansion is not set",
@ -76,6 +81,26 @@ func TestNodeExpand(t *testing.T) {
success: true,
fsVolume: false,
},
{
name: "when nodeExpansion=on, nodeStage=on, volumePhase=published has grpc volume-in-use error",
nodeExpansion: true,
nodeStageSet: true,
volumePhase: volume.CSIVolumePublished,
success: false,
fsVolume: true,
grpcError: status.Error(codes.FailedPrecondition, "volume-in-use"),
hasVolumeInUseError: true,
},
{
name: "when nodeExpansion=on, nodeStage=on, volumePhase=published has other grpc error",
nodeExpansion: true,
nodeStageSet: true,
volumePhase: volume.CSIVolumePublished,
success: false,
fsVolume: true,
grpcError: status.Error(codes.InvalidArgument, "invalid-argument"),
hasVolumeInUseError: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
@ -99,8 +124,19 @@ func TestNodeExpand(t *testing.T) {
fakeCSIClient, _ := csClient.(*fakeCsiDriverClient)
fakeNodeClient := fakeCSIClient.nodeClient
if tc.grpcError != nil {
fakeNodeClient.SetNextError(tc.grpcError)
}
ok, err := plug.nodeExpandWithClient(resizeOptions, csiSource, csClient, tc.fsVolume)
if tc.hasVolumeInUseError {
if !volumetypes.IsFailedPreconditionError(err) {
t.Errorf("expected failed precondition error got: %v", err)
}
}
// verify device staging targer path
stagingTargetPath := fakeNodeClient.FakeNodeExpansionRequest.GetStagingTargetPath()
if tc.deviceStagePath != "" && tc.deviceStagePath != stagingTargetPath {

View File

@ -89,6 +89,9 @@ const (
// SuccessAndFailOnMountDeviceName will cause first mount operation to succeed but subsequent attempts to fail
SuccessAndFailOnMountDeviceName = "success-and-failed-mount-device-name"
// FailWithInUseVolumeName will cause NodeExpandVolume to result in FailedPrecondition error
FailWithInUseVolumeName = "fail-expansion-in-use"
deviceNotMounted = "deviceNotMounted"
deviceMountUncertain = "deviceMountUncertain"
deviceMounted = "deviceMounted"
@ -658,6 +661,9 @@ func (plugin *FakeVolumePlugin) RequiresFSResize() bool {
}
func (plugin *FakeVolumePlugin) NodeExpand(resizeOptions NodeResizeOptions) (bool, error) {
if resizeOptions.VolumeSpec.Name() == FailWithInUseVolumeName {
return false, volumetypes.NewFailedPreconditionError("volume-in-use")
}
return true, nil
}

View File

@ -202,6 +202,10 @@ type ActualStateOfWorldMounterUpdater interface {
// GetVolumeMountState returns mount state of the volume for the Pod
GetVolumeMountState(volumName v1.UniqueVolumeName, podName volumetypes.UniquePodName) VolumeMountState
// MarkForInUseExpansionError marks the volume to have in-use error during expansion.
// volume expansion must not be retried for this volume
MarkForInUseExpansionError(volumeName v1.UniqueVolumeName)
}
// ActualStateOfWorldAttacherUpdater defines a set of operations updating the

View File

@ -613,7 +613,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
// NodeExpandVolume will resize the file system if user has requested a resize of
// underlying persistent volume and is allowed to do so.
resizeDone, resizeError = og.nodeExpandVolume(volumeToMount, resizeOptions)
resizeDone, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
if resizeError != nil {
klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError)
@ -669,7 +669,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
// - Volume does not support DeviceMounter interface.
// - In case of CSI the volume does not have node stage_unstage capability.
if !resizeDone {
_, resizeError = og.nodeExpandVolume(volumeToMount, resizeOptions)
_, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
if resizeError != nil {
klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError)
return volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError)
@ -1083,7 +1083,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
DeviceStagePath: stagingPath,
CSIVolumePhase: volume.CSIVolumePublished,
}
_, resizeError := og.nodeExpandVolume(volumeToMount, resizeOptions)
_, resizeError := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
if resizeError != nil {
klog.Errorf("MapVolume.NodeExpandVolume failed with %v", resizeError)
return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError)
@ -1587,10 +1587,10 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
resizeOptions volume.NodeResizeOptions) (bool, error, error) {
resizeDone, err := og.nodeExpandVolume(volumeToMount, resizeOptions)
resizeDone, err := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
if err != nil {
klog.Errorf("NodeExpandVolume.NodeExpandVolume failed : %v", err)
e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed", err)
klog.Errorf(e2.Error())
return false, e1, e2
}
if resizeDone {
@ -1605,7 +1605,10 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
return false, nil, nil
}
func (og *operationGenerator) nodeExpandVolume(volumeToMount VolumeToMount, rsOpts volume.NodeResizeOptions) (bool, error) {
func (og *operationGenerator) nodeExpandVolume(
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
rsOpts volume.NodeResizeOptions) (bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) {
klog.V(4).Infof("Resizing is not enabled for this volume %s", volumeToMount.VolumeName)
return true, nil
@ -1649,7 +1652,15 @@ func (og *operationGenerator) nodeExpandVolume(volumeToMount VolumeToMount, rsOp
rsOpts.OldSize = pvcStatusCap
resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
if resizeErr != nil {
return false, fmt.Errorf("MountVolume.NodeExpandVolume failed : %v", resizeErr)
// if driver returned FailedPrecondition error that means
// volume expansion should not be retried on this node but
// expansion operation should not block mounting
if volumetypes.IsFailedPreconditionError(resizeErr) {
actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName)
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error())
return true, nil
}
return false, resizeErr
}
// Volume resizing is not done but it did not error out. This could happen if a CSI volume
// does not have node stage_unstage capability but was asked to resize the volume before

View File

@ -54,6 +54,28 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
return o.OperationFunc()
}
// FailedPrecondition error indicates CSI operation returned failed precondition
// error
type FailedPrecondition struct {
msg string
}
func (err *FailedPrecondition) Error() string {
return err.msg
}
// NewFailedPreconditionError returns a new FailedPrecondition error instance
func NewFailedPreconditionError(msg string) *FailedPrecondition {
return &FailedPrecondition{msg: msg}
}
// IsFailedPreconditionError checks if given error is of type that indicates
// operation failed with precondition
func IsFailedPreconditionError(err error) bool {
var failedPreconditionError *FailedPrecondition
return errors.As(err, &failedPreconditionError)
}
// TransientOperationFailure indicates operation failed with a transient error
// and may fix itself when retried.
type TransientOperationFailure struct {

View File

@ -23,28 +23,55 @@ import (
"k8s.io/utils/mount"
)
func TestIsFilesystemMismatchError(t *testing.T) {
func TestErrorTypes(t *testing.T) {
tests := []struct {
mountError error
expectError bool
name string
realError error
errorCheckFunc func(error) bool
expectError bool
}{
{
"when mount error has File system mismatch errors",
mount.NewMountError(mount.FilesystemMismatch, "filesystem mismatch"),
IsFilesystemMismatchError,
true,
},
{
"when mount error has other error",
mount.NewMountError(mount.FormatFailed, "filesystem mismatch"),
IsFilesystemMismatchError,
false,
},
{
"when mount error wraps filesystem mismatch error",
fmt.Errorf("mount failed %w", mount.NewMountError(mount.FilesystemMismatch, "filesystem mismatch")),
IsFilesystemMismatchError,
true,
},
{
"when error has no failedPrecondition error",
fmt.Errorf("some other error"),
IsFailedPreconditionError,
false,
},
{
"when error has failedPrecondition error",
NewFailedPreconditionError("volume-in-use"),
IsFailedPreconditionError,
true,
},
{
"when error wraps failedPrecondition error",
fmt.Errorf("volume readonly %w", NewFailedPreconditionError("volume-in-use-error")),
IsFailedPreconditionError,
true,
},
}
for _, test := range tests {
ok := IsFilesystemMismatchError(test.mountError)
ok := test.errorCheckFunc(test.realError)
if ok != test.expectError {
t.Errorf("expected filesystem mismatch to be %v but got %v", test.expectError, ok)
t.Errorf("for %s: expected error to be %v but got %v", test.name, test.expectError, ok)
}
}
}