From 785e68a626634e01e15f3864d3fe92c7ec2b64bb Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Mon, 15 Jul 2024 15:20:35 -0400 Subject: [PATCH] Add code for testing final errors --- pkg/volume/testing/testing.go | 11 +- .../util/operationexecutor/node_expander.go | 27 +++- .../operationexecutor/node_expander_test.go | 146 +++++++++++++++++- .../operation_generator_test.go | 7 +- pkg/volume/util/resize_util.go | 35 ++++- 5 files changed, 211 insertions(+), 15 deletions(-) diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 1710dc63f32..cd7c654610d 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -88,7 +88,8 @@ const ( FailVolumeExpansion = "fail-expansion-test" - AlwaysFailNodeExpansion = "always-fail-node-expansion" + InfeasibleNodeExpansion = "infeasible-fail-node-expansion" + OtherFinalNodeExpansionError = "other-final-node-expansion-error" deviceNotMounted = "deviceNotMounted" deviceMountUncertain = "deviceMountUncertain" @@ -500,8 +501,12 @@ func (plugin *FakeVolumePlugin) NodeExpand(resizeOptions volume.NodeResizeOption return false, volumetypes.NewOperationNotSupportedError("volume-unsupported") } - if resizeOptions.VolumeSpec.Name() == AlwaysFailNodeExpansion { - return false, fmt.Errorf("test failure: NodeExpand") + if resizeOptions.VolumeSpec.Name() == InfeasibleNodeExpansion { + return false, volumetypes.NewInfeasibleError("infeasible-expansion") + } + + if resizeOptions.VolumeSpec.Name() == OtherFinalNodeExpansionError { + return false, fmt.Errorf("other-final-node-expansion-error") } if resizeOptions.VolumeSpec.Name() == FailVolumeExpansion { diff --git a/pkg/volume/util/operationexecutor/node_expander.go b/pkg/volume/util/operationexecutor/node_expander.go index cf9c57504e8..8e55d0b771c 100644 --- a/pkg/volume/util/operationexecutor/node_expander.go +++ b/pkg/volume/util/operationexecutor/node_expander.go @@ -39,6 +39,10 @@ type NodeExpander struct { pvCap resource.Quantity resizeStatus v1.ClaimResourceStatus + // indicates that pvc spec size has actually changed since last we observerd size + // on the kubelet + markExpansionInfeasible bool + // pvcAlreadyUpdated if true indicates that although we are calling NodeExpandVolume on the kubelet // PVC has already been updated - possibly because expansion already succeeded on different node. // This can happen when a RWX PVC is expanded. @@ -79,6 +83,15 @@ func (ne *NodeExpander) runPreCheck() bool { ne.resizeStatus = currentStatus } + pvcSpecCap := ne.pvc.Spec.Resources.Requests[v1.ResourceStorage] + // usually when are performing node expansion, we expect pv size and pvc spec size + // to be the same, but if user has edited pvc since then and volume expansion failed + // with final error, then we should let controller reconcile this state. + if pvcSpecCap.Cmp(ne.pluginResizeOpts.NewSize) != 0 && + ne.actualStateOfWorld.CheckVolumeInFailedExpansionWithFinalErrors(ne.vmt.VolumeName) { + ne.markExpansionInfeasible = true + } + // PVC is already expanded but we are still trying to expand the volume because // last recorded size in ASOW is older. This can happen for RWX volume types. if ne.pvcStatusCap.Cmp(ne.pluginResizeOpts.NewSize) >= 0 && ne.resizeStatus == "" { @@ -124,9 +137,17 @@ func (ne *NodeExpander) expandOnPlugin() (bool, error, testResponseData) { if resizeErr != nil { if volumetypes.IsOperationFinishedError(resizeErr) { var markFailedError error - ne.pvc, markFailedError = util.MarkNodeExpansionFailed(ne.pvc, ne.kubeClient) - if markFailedError != nil { - klog.Errorf(ne.vmt.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error()) + ne.actualStateOfWorld.MarkVolumeExpansionFailedWithFinalError(ne.vmt.VolumeName) + if volumetypes.IsInfeasibleError(resizeErr) || ne.markExpansionInfeasible { + ne.pvc, markFailedError = util.MarkNodeExpansionInfeasible(ne.pvc, ne.kubeClient, resizeErr) + if markFailedError != nil { + klog.Errorf(ne.vmt.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error()) + } + } else { + ne.pvc, markFailedError = util.MarkNodeExpansionFailedCondition(ne.pvc, ne.kubeClient, resizeErr) + if markFailedError != nil { + klog.Errorf(ne.vmt.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error()) + } } } diff --git a/pkg/volume/util/operationexecutor/node_expander_test.go b/pkg/volume/util/operationexecutor/node_expander_test.go index 09d1620fd52..87d029004bc 100644 --- a/pkg/volume/util/operationexecutor/node_expander_test.go +++ b/pkg/volume/util/operationexecutor/node_expander_test.go @@ -17,17 +17,37 @@ limitations under the License. package operationexecutor import ( + "sync" "testing" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" volumetesting "k8s.io/kubernetes/pkg/volume/testing" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) +type fakeActualStateOfWorld struct { + // nodeName is the name of this node. This value is passed to Attach/Detach + nodeName types.NodeName + + volumesWithFinalExpansionErrors sets.Set[v1.UniqueVolumeName] + sync.RWMutex +} + +var _ ActualStateOfWorldMounterUpdater = &fakeActualStateOfWorld{} + +func newFakeActualStateOfWorld() *fakeActualStateOfWorld { + return &fakeActualStateOfWorld{ + volumesWithFinalExpansionErrors: sets.New[v1.UniqueVolumeName](), + } +} + func TestNodeExpander(t *testing.T) { nodeResizeFailed := v1.PersistentVolumeClaimNodeResizeInfeasible @@ -46,6 +66,7 @@ func TestNodeExpander(t *testing.T) { expectedResizeStatus v1.ClaimResourceStatus expectedStatusSize resource.Quantity expectResizeCall bool + expectFinalErrors bool assumeResizeOpAsFinished bool expectError bool }{ @@ -57,6 +78,7 @@ func TestNodeExpander(t *testing.T) { expectedResizeStatus: nodeResizeFailed, expectResizeCall: false, assumeResizeOpAsFinished: true, + expectFinalErrors: false, expectedStatusSize: resource.MustParse("1G"), }, { @@ -66,16 +88,29 @@ func TestNodeExpander(t *testing.T) { expectedResizeStatus: "", expectResizeCall: true, assumeResizeOpAsFinished: true, + expectFinalErrors: false, expectedStatusSize: resource.MustParse("2G"), }, { - name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing", - pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", &nodeResizePending), - pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"), + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=infeasible", + pvc: getTestPVC(volumetesting.InfeasibleNodeExpansion, "2G", "1G", "2G", &nodeResizePending), + pv: getTestPV(volumetesting.InfeasibleNodeExpansion, "2G"), expectError: true, expectedResizeStatus: nodeResizeFailed, expectResizeCall: true, assumeResizeOpAsFinished: true, + expectFinalErrors: true, + expectedStatusSize: resource.MustParse("1G"), + }, + { + name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing", + pvc: getTestPVC(volumetesting.OtherFinalNodeExpansionError, "2G", "1G", "2G", &nodeResizePending), + pv: getTestPV(volumetesting.OtherFinalNodeExpansionError, "2G"), + expectError: true, + expectedResizeStatus: v1.PersistentVolumeClaimNodeResizeInProgress, + expectResizeCall: true, + assumeResizeOpAsFinished: true, + expectFinalErrors: true, expectedStatusSize: resource.MustParse("1G"), }, { @@ -86,6 +121,7 @@ func TestNodeExpander(t *testing.T) { expectedResizeStatus: "", expectResizeCall: true, assumeResizeOpAsFinished: true, + expectFinalErrors: false, expectedStatusSize: resource.MustParse("2G"), }, } @@ -114,12 +150,13 @@ func TestNodeExpander(t *testing.T) { if actualSize == nil { actualSize = pvc.Status.Capacity.Storage() } + asow := newFakeActualStateOfWorld() resizeOp := nodeResizeOperationOpts{ pvc: pvc, pv: pv, volumePlugin: fakePlugin, vmt: vmt, - actualStateOfWorld: nil, + actualStateOfWorld: asow, pluginResizeOpts: volume.NodeResizeOptions{ VolumeSpec: vmt.VolumeSpec, NewSize: *desiredSize, @@ -153,9 +190,110 @@ func TestNodeExpander(t *testing.T) { if test.expectedResizeStatus != resizeStatus { t.Errorf("For test %s, expected resizeStatus %v, got %v", test.name, test.expectedResizeStatus, resizeStatus) } + + if test.expectFinalErrors != asow.CheckVolumeInFailedExpansionWithFinalErrors(vmt.VolumeName) { + t.Errorf("For test %s, expected final errors %t, got %t", test.name, test.expectFinalErrors, !test.expectFinalErrors) + } if pvcStatusCap.Cmp(test.expectedStatusSize) != 0 { t.Errorf("For test %s, expected status size %s, got %s", test.name, test.expectedStatusSize.String(), pvcStatusCap.String()) } }) } } + +// CheckAndMarkDeviceUncertainViaReconstruction implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) CheckAndMarkDeviceUncertainViaReconstruction(volumeName v1.UniqueVolumeName, deviceMountPath string) bool { + panic("unimplemented") +} + +// CheckAndMarkVolumeAsUncertainViaReconstruction implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) CheckAndMarkVolumeAsUncertainViaReconstruction(opts MarkVolumeOpts) (bool, error) { + panic("unimplemented") +} + +// CheckVolumeInFailedExpansionWithFinalErrors implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) CheckVolumeInFailedExpansionWithFinalErrors(volumeName v1.UniqueVolumeName) bool { + f.RLock() + defer f.RUnlock() + return f.volumesWithFinalExpansionErrors.Has(volumeName) +} + +// GetDeviceMountState implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState { + panic("unimplemented") +} + +// GetVolumeMountState implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) GetVolumeMountState(volumName v1.UniqueVolumeName, podName volumetypes.UniquePodName) VolumeMountState { + panic("unimplemented") +} + +// IsVolumeDeviceReconstructed implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) IsVolumeDeviceReconstructed(volumeName v1.UniqueVolumeName) bool { + panic("unimplemented") +} + +// IsVolumeMountedElsewhere implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) IsVolumeMountedElsewhere(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { + panic("unimplemented") +} + +// IsVolumeReconstructed implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { + panic("unimplemented") +} + +// MarkDeviceAsMounted implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath string, deviceMountPath string, seLinuxMountContext string) error { + panic("unimplemented") +} + +// MarkDeviceAsUncertain implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) MarkDeviceAsUncertain(volumeName v1.UniqueVolumeName, devicePath string, deviceMountPath string, seLinuxMountContext string) error { + panic("unimplemented") +} + +// MarkDeviceAsUnmounted implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error { + panic("unimplemented") +} + +// MarkForInUseExpansionError implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) MarkForInUseExpansionError(volumeName v1.UniqueVolumeName) { + panic("unimplemented") +} + +// MarkVolumeAsMounted implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts MarkVolumeOpts) error { + panic("unimplemented") +} + +// MarkVolumeAsResized implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool { + panic("unimplemented") +} + +// MarkVolumeAsUnmounted implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error { + panic("unimplemented") +} + +// MarkVolumeExpansionFailedWithFinalError implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) MarkVolumeExpansionFailedWithFinalError(volumeName v1.UniqueVolumeName) { + f.Lock() + defer f.Unlock() + + f.volumesWithFinalExpansionErrors.Insert(volumeName) +} + +// MarkVolumeMountAsUncertain implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) MarkVolumeMountAsUncertain(markVolumeOpts MarkVolumeOpts) error { + panic("unimplemented") +} + +// RemoveVolumeFromFailedWithFinalErrors implements ActualStateOfWorldMounterUpdater. +func (f *fakeActualStateOfWorld) RemoveVolumeFromFailedWithFinalErrors(volumeName v1.UniqueVolumeName) { + f.Lock() + defer f.Unlock() + f.volumesWithFinalExpansionErrors.Delete(volumeName) +} diff --git a/pkg/volume/util/operationexecutor/operation_generator_test.go b/pkg/volume/util/operationexecutor/operation_generator_test.go index 6fc4497361d..2835941cb30 100644 --- a/pkg/volume/util/operationexecutor/operation_generator_test.go +++ b/pkg/volume/util/operationexecutor/operation_generator_test.go @@ -231,8 +231,8 @@ func TestOperationGenerator_nodeExpandVolume(t *testing.T) { }, { name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing", - pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", &nodeResizePending), - pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"), + pvc: getTestPVC(volumetesting.InfeasibleNodeExpansion, "2G", "1G", "2G", &nodeResizePending), + pv: getTestPV(volumetesting.InfeasibleNodeExpansion, "2G"), expectError: true, expectedResizeStatus: v1.PersistentVolumeClaimNodeResizeInfeasible, resizeCallCount: 1, @@ -302,9 +302,10 @@ func TestOperationGenerator_nodeExpandVolume(t *testing.T) { NewSize: *desiredSize, OldSize: *actualSize, } + asow := newFakeActualStateOfWorld() ogInstance, _ := og.(*operationGenerator) - _, err := ogInstance.nodeExpandVolume(vmt, nil, pluginResizeOpts) + _, err := ogInstance.nodeExpandVolume(vmt, asow, pluginResizeOpts) if !test.expectError && err != nil { t.Errorf("For test %s, expected no error got: %v", test.name, err) diff --git a/pkg/volume/util/resize_util.go b/pkg/volume/util/resize_util.go index aaa93d7b1e5..6279ef9a39a 100644 --- a/pkg/volume/util/resize_util.go +++ b/pkg/volume/util/resize_util.go @@ -40,6 +40,8 @@ var ( knownResizeConditions = map[v1.PersistentVolumeClaimConditionType]bool{ v1.PersistentVolumeClaimFileSystemResizePending: true, v1.PersistentVolumeClaimResizing: true, + v1.PersistentVolumeClaimControllerResizeError: true, + v1.PersistentVolumeClaimNodeResizeError: true, } // AnnPreResizeCapacity annotation is added to a PV when expanding volume. @@ -234,11 +236,18 @@ func MarkFSResizeFinished( return updatedPVC, err } -// MarkNodeExpansionFailed marks a PVC for node expansion as failed. Kubelet should not retry expansion +// MarkNodeExpansionInfeasible marks a PVC for node expansion as failed. Kubelet should not retry expansion // of volumes which are in failed state. -func MarkNodeExpansionFailed(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { +func MarkNodeExpansionInfeasible(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface, err error) (*v1.PersistentVolumeClaim, error) { newPVC := pvc.DeepCopy() newPVC = mergeStorageResourceStatus(newPVC, v1.PersistentVolumeClaimNodeResizeInfeasible) + errorCondition := v1.PersistentVolumeClaimCondition{ + Type: v1.PersistentVolumeClaimNodeResizeError, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Message: fmt.Sprintf("failed to expand pvc with %v", err), + } + newPVC = MergeResizeConditionOnPVC(newPVC, []v1.PersistentVolumeClaimCondition{errorCondition}) patchBytes, err := createPVCPatch(pvc, newPVC, false /* addResourceVersionCheck */) if err != nil { @@ -253,6 +262,28 @@ func MarkNodeExpansionFailed(pvc *v1.PersistentVolumeClaim, kubeClient clientset return updatedClaim, nil } +func MarkNodeExpansionFailedCondition(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface, err error) (*v1.PersistentVolumeClaim, error) { + newPVC := pvc.DeepCopy() + errorCondition := v1.PersistentVolumeClaimCondition{ + Type: v1.PersistentVolumeClaimNodeResizeError, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Message: fmt.Sprintf("failed to expand pvc with %v", err), + } + newPVC = MergeResizeConditionOnPVC(newPVC, []v1.PersistentVolumeClaimCondition{errorCondition}) + patchBytes, err := createPVCPatch(pvc, newPVC, false /* addResourceVersionCheck */) + if err != nil { + return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, err) + } + + updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace). + Patch(context.TODO(), pvc.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") + if updateErr != nil { + return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, updateErr) + } + return updatedClaim, nil +} + // MarkNodeExpansionInProgress marks pvc expansion in progress on node func MarkNodeExpansionInProgress(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { newPVC := pvc.DeepCopy()