Add code for testing final errors

This commit is contained in:
Hemant Kumar
2024-07-15 15:20:35 -04:00
parent c8d9863a3e
commit 785e68a626
5 changed files with 211 additions and 15 deletions

View File

@@ -88,7 +88,8 @@ const (
FailVolumeExpansion = "fail-expansion-test"
AlwaysFailNodeExpansion = "always-fail-node-expansion"
InfeasibleNodeExpansion = "infeasible-fail-node-expansion"
OtherFinalNodeExpansionError = "other-final-node-expansion-error"
deviceNotMounted = "deviceNotMounted"
deviceMountUncertain = "deviceMountUncertain"
@@ -500,8 +501,12 @@ func (plugin *FakeVolumePlugin) NodeExpand(resizeOptions volume.NodeResizeOption
return false, volumetypes.NewOperationNotSupportedError("volume-unsupported")
}
if resizeOptions.VolumeSpec.Name() == AlwaysFailNodeExpansion {
return false, fmt.Errorf("test failure: NodeExpand")
if resizeOptions.VolumeSpec.Name() == InfeasibleNodeExpansion {
return false, volumetypes.NewInfeasibleError("infeasible-expansion")
}
if resizeOptions.VolumeSpec.Name() == OtherFinalNodeExpansionError {
return false, fmt.Errorf("other-final-node-expansion-error")
}
if resizeOptions.VolumeSpec.Name() == FailVolumeExpansion {

View File

@@ -39,6 +39,10 @@ type NodeExpander struct {
pvCap resource.Quantity
resizeStatus v1.ClaimResourceStatus
// indicates that pvc spec size has actually changed since last we observerd size
// on the kubelet
markExpansionInfeasible bool
// pvcAlreadyUpdated if true indicates that although we are calling NodeExpandVolume on the kubelet
// PVC has already been updated - possibly because expansion already succeeded on different node.
// This can happen when a RWX PVC is expanded.
@@ -79,6 +83,15 @@ func (ne *NodeExpander) runPreCheck() bool {
ne.resizeStatus = currentStatus
}
pvcSpecCap := ne.pvc.Spec.Resources.Requests[v1.ResourceStorage]
// usually when are performing node expansion, we expect pv size and pvc spec size
// to be the same, but if user has edited pvc since then and volume expansion failed
// with final error, then we should let controller reconcile this state.
if pvcSpecCap.Cmp(ne.pluginResizeOpts.NewSize) != 0 &&
ne.actualStateOfWorld.CheckVolumeInFailedExpansionWithFinalErrors(ne.vmt.VolumeName) {
ne.markExpansionInfeasible = true
}
// PVC is already expanded but we are still trying to expand the volume because
// last recorded size in ASOW is older. This can happen for RWX volume types.
if ne.pvcStatusCap.Cmp(ne.pluginResizeOpts.NewSize) >= 0 && ne.resizeStatus == "" {
@@ -124,9 +137,17 @@ func (ne *NodeExpander) expandOnPlugin() (bool, error, testResponseData) {
if resizeErr != nil {
if volumetypes.IsOperationFinishedError(resizeErr) {
var markFailedError error
ne.pvc, markFailedError = util.MarkNodeExpansionFailed(ne.pvc, ne.kubeClient)
if markFailedError != nil {
klog.Errorf(ne.vmt.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error())
ne.actualStateOfWorld.MarkVolumeExpansionFailedWithFinalError(ne.vmt.VolumeName)
if volumetypes.IsInfeasibleError(resizeErr) || ne.markExpansionInfeasible {
ne.pvc, markFailedError = util.MarkNodeExpansionInfeasible(ne.pvc, ne.kubeClient, resizeErr)
if markFailedError != nil {
klog.Errorf(ne.vmt.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error())
}
} else {
ne.pvc, markFailedError = util.MarkNodeExpansionFailedCondition(ne.pvc, ne.kubeClient, resizeErr)
if markFailedError != nil {
klog.Errorf(ne.vmt.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error())
}
}
}

View File

@@ -17,17 +17,37 @@ limitations under the License.
package operationexecutor
import (
"sync"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
type fakeActualStateOfWorld struct {
// nodeName is the name of this node. This value is passed to Attach/Detach
nodeName types.NodeName
volumesWithFinalExpansionErrors sets.Set[v1.UniqueVolumeName]
sync.RWMutex
}
var _ ActualStateOfWorldMounterUpdater = &fakeActualStateOfWorld{}
func newFakeActualStateOfWorld() *fakeActualStateOfWorld {
return &fakeActualStateOfWorld{
volumesWithFinalExpansionErrors: sets.New[v1.UniqueVolumeName](),
}
}
func TestNodeExpander(t *testing.T) {
nodeResizeFailed := v1.PersistentVolumeClaimNodeResizeInfeasible
@@ -46,6 +66,7 @@ func TestNodeExpander(t *testing.T) {
expectedResizeStatus v1.ClaimResourceStatus
expectedStatusSize resource.Quantity
expectResizeCall bool
expectFinalErrors bool
assumeResizeOpAsFinished bool
expectError bool
}{
@@ -57,6 +78,7 @@ func TestNodeExpander(t *testing.T) {
expectedResizeStatus: nodeResizeFailed,
expectResizeCall: false,
assumeResizeOpAsFinished: true,
expectFinalErrors: false,
expectedStatusSize: resource.MustParse("1G"),
},
{
@@ -66,16 +88,29 @@ func TestNodeExpander(t *testing.T) {
expectedResizeStatus: "",
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectFinalErrors: false,
expectedStatusSize: resource.MustParse("2G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing",
pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", &nodeResizePending),
pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"),
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=infeasible",
pvc: getTestPVC(volumetesting.InfeasibleNodeExpansion, "2G", "1G", "2G", &nodeResizePending),
pv: getTestPV(volumetesting.InfeasibleNodeExpansion, "2G"),
expectError: true,
expectedResizeStatus: nodeResizeFailed,
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectFinalErrors: true,
expectedStatusSize: resource.MustParse("1G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing",
pvc: getTestPVC(volumetesting.OtherFinalNodeExpansionError, "2G", "1G", "2G", &nodeResizePending),
pv: getTestPV(volumetesting.OtherFinalNodeExpansionError, "2G"),
expectError: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeResizeInProgress,
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectFinalErrors: true,
expectedStatusSize: resource.MustParse("1G"),
},
{
@@ -86,6 +121,7 @@ func TestNodeExpander(t *testing.T) {
expectedResizeStatus: "",
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectFinalErrors: false,
expectedStatusSize: resource.MustParse("2G"),
},
}
@@ -114,12 +150,13 @@ func TestNodeExpander(t *testing.T) {
if actualSize == nil {
actualSize = pvc.Status.Capacity.Storage()
}
asow := newFakeActualStateOfWorld()
resizeOp := nodeResizeOperationOpts{
pvc: pvc,
pv: pv,
volumePlugin: fakePlugin,
vmt: vmt,
actualStateOfWorld: nil,
actualStateOfWorld: asow,
pluginResizeOpts: volume.NodeResizeOptions{
VolumeSpec: vmt.VolumeSpec,
NewSize: *desiredSize,
@@ -153,9 +190,110 @@ func TestNodeExpander(t *testing.T) {
if test.expectedResizeStatus != resizeStatus {
t.Errorf("For test %s, expected resizeStatus %v, got %v", test.name, test.expectedResizeStatus, resizeStatus)
}
if test.expectFinalErrors != asow.CheckVolumeInFailedExpansionWithFinalErrors(vmt.VolumeName) {
t.Errorf("For test %s, expected final errors %t, got %t", test.name, test.expectFinalErrors, !test.expectFinalErrors)
}
if pvcStatusCap.Cmp(test.expectedStatusSize) != 0 {
t.Errorf("For test %s, expected status size %s, got %s", test.name, test.expectedStatusSize.String(), pvcStatusCap.String())
}
})
}
}
// CheckAndMarkDeviceUncertainViaReconstruction implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) CheckAndMarkDeviceUncertainViaReconstruction(volumeName v1.UniqueVolumeName, deviceMountPath string) bool {
panic("unimplemented")
}
// CheckAndMarkVolumeAsUncertainViaReconstruction implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) CheckAndMarkVolumeAsUncertainViaReconstruction(opts MarkVolumeOpts) (bool, error) {
panic("unimplemented")
}
// CheckVolumeInFailedExpansionWithFinalErrors implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) CheckVolumeInFailedExpansionWithFinalErrors(volumeName v1.UniqueVolumeName) bool {
f.RLock()
defer f.RUnlock()
return f.volumesWithFinalExpansionErrors.Has(volumeName)
}
// GetDeviceMountState implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState {
panic("unimplemented")
}
// GetVolumeMountState implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) GetVolumeMountState(volumName v1.UniqueVolumeName, podName volumetypes.UniquePodName) VolumeMountState {
panic("unimplemented")
}
// IsVolumeDeviceReconstructed implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) IsVolumeDeviceReconstructed(volumeName v1.UniqueVolumeName) bool {
panic("unimplemented")
}
// IsVolumeMountedElsewhere implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) IsVolumeMountedElsewhere(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool {
panic("unimplemented")
}
// IsVolumeReconstructed implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool {
panic("unimplemented")
}
// MarkDeviceAsMounted implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath string, deviceMountPath string, seLinuxMountContext string) error {
panic("unimplemented")
}
// MarkDeviceAsUncertain implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) MarkDeviceAsUncertain(volumeName v1.UniqueVolumeName, devicePath string, deviceMountPath string, seLinuxMountContext string) error {
panic("unimplemented")
}
// MarkDeviceAsUnmounted implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error {
panic("unimplemented")
}
// MarkForInUseExpansionError implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) MarkForInUseExpansionError(volumeName v1.UniqueVolumeName) {
panic("unimplemented")
}
// MarkVolumeAsMounted implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts MarkVolumeOpts) error {
panic("unimplemented")
}
// MarkVolumeAsResized implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool {
panic("unimplemented")
}
// MarkVolumeAsUnmounted implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error {
panic("unimplemented")
}
// MarkVolumeExpansionFailedWithFinalError implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) MarkVolumeExpansionFailedWithFinalError(volumeName v1.UniqueVolumeName) {
f.Lock()
defer f.Unlock()
f.volumesWithFinalExpansionErrors.Insert(volumeName)
}
// MarkVolumeMountAsUncertain implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) MarkVolumeMountAsUncertain(markVolumeOpts MarkVolumeOpts) error {
panic("unimplemented")
}
// RemoveVolumeFromFailedWithFinalErrors implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) RemoveVolumeFromFailedWithFinalErrors(volumeName v1.UniqueVolumeName) {
f.Lock()
defer f.Unlock()
f.volumesWithFinalExpansionErrors.Delete(volumeName)
}

View File

@@ -231,8 +231,8 @@ func TestOperationGenerator_nodeExpandVolume(t *testing.T) {
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing",
pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", &nodeResizePending),
pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"),
pvc: getTestPVC(volumetesting.InfeasibleNodeExpansion, "2G", "1G", "2G", &nodeResizePending),
pv: getTestPV(volumetesting.InfeasibleNodeExpansion, "2G"),
expectError: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeResizeInfeasible,
resizeCallCount: 1,
@@ -302,9 +302,10 @@ func TestOperationGenerator_nodeExpandVolume(t *testing.T) {
NewSize: *desiredSize,
OldSize: *actualSize,
}
asow := newFakeActualStateOfWorld()
ogInstance, _ := og.(*operationGenerator)
_, err := ogInstance.nodeExpandVolume(vmt, nil, pluginResizeOpts)
_, err := ogInstance.nodeExpandVolume(vmt, asow, pluginResizeOpts)
if !test.expectError && err != nil {
t.Errorf("For test %s, expected no error got: %v", test.name, err)

View File

@@ -40,6 +40,8 @@ var (
knownResizeConditions = map[v1.PersistentVolumeClaimConditionType]bool{
v1.PersistentVolumeClaimFileSystemResizePending: true,
v1.PersistentVolumeClaimResizing: true,
v1.PersistentVolumeClaimControllerResizeError: true,
v1.PersistentVolumeClaimNodeResizeError: true,
}
// AnnPreResizeCapacity annotation is added to a PV when expanding volume.
@@ -234,11 +236,18 @@ func MarkFSResizeFinished(
return updatedPVC, err
}
// MarkNodeExpansionFailed marks a PVC for node expansion as failed. Kubelet should not retry expansion
// MarkNodeExpansionInfeasible marks a PVC for node expansion as failed. Kubelet should not retry expansion
// of volumes which are in failed state.
func MarkNodeExpansionFailed(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
func MarkNodeExpansionInfeasible(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface, err error) (*v1.PersistentVolumeClaim, error) {
newPVC := pvc.DeepCopy()
newPVC = mergeStorageResourceStatus(newPVC, v1.PersistentVolumeClaimNodeResizeInfeasible)
errorCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimNodeResizeError,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: fmt.Sprintf("failed to expand pvc with %v", err),
}
newPVC = MergeResizeConditionOnPVC(newPVC, []v1.PersistentVolumeClaimCondition{errorCondition})
patchBytes, err := createPVCPatch(pvc, newPVC, false /* addResourceVersionCheck */)
if err != nil {
@@ -253,6 +262,28 @@ func MarkNodeExpansionFailed(pvc *v1.PersistentVolumeClaim, kubeClient clientset
return updatedClaim, nil
}
func MarkNodeExpansionFailedCondition(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface, err error) (*v1.PersistentVolumeClaim, error) {
newPVC := pvc.DeepCopy()
errorCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimNodeResizeError,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: fmt.Sprintf("failed to expand pvc with %v", err),
}
newPVC = MergeResizeConditionOnPVC(newPVC, []v1.PersistentVolumeClaimCondition{errorCondition})
patchBytes, err := createPVCPatch(pvc, newPVC, false /* addResourceVersionCheck */)
if err != nil {
return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, err)
}
updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).
Patch(context.TODO(), pvc.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if updateErr != nil {
return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, updateErr)
}
return updatedClaim, nil
}
// MarkNodeExpansionInProgress marks pvc expansion in progress on node
func MarkNodeExpansionInProgress(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
newPVC := pvc.DeepCopy()