Merge pull request #126600 from gnufied/update-e2e-expansion

Use allocatedResources as new size for finishing volume expansion on the node
This commit is contained in:
Kubernetes Prow Robot 2024-08-19 14:15:20 -07:00 committed by GitHub
commit 5c14a57795
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 218 additions and 66 deletions

View File

@ -960,6 +960,8 @@ func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, de
return currentSize, false return currentSize, false
} }
klog.V(5).InfoS("NodeExpandVolume checking size", "actualSize", volumeObj.persistentVolumeSize.String(), "desiredSize", desiredVolumeSize.String(), "volume", volumeObj.volumeName)
if desiredVolumeSize.Cmp(*volumeObj.persistentVolumeSize) > 0 { if desiredVolumeSize.Cmp(*volumeObj.persistentVolumeSize) > 0 {
volumePlugin, err := asw.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeObj.spec) volumePlugin, err := asw.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeObj.spec)
if err != nil || volumePlugin == nil { if err != nil || volumePlugin == nil {

View File

@ -380,10 +380,11 @@ func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize(
klog.V(5).InfoS("Skip file system resize check for the volume, as the volume is mounted as readonly", "pod", klog.KObj(pod), "volumeName", podVolume.Name) klog.V(5).InfoS("Skip file system resize check for the volume, as the volume is mounted as readonly", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
return return
} }
pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage() pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage()
pvcStatusCap := pvc.Status.Capacity.Storage() pvcStatusCap := pvc.Status.Capacity.Storage()
dswp.desiredStateOfWorld.UpdatePersistentVolumeSize(uniqueVolumeName, pvCap) dswp.desiredStateOfWorld.UpdatePersistentVolumeSize(uniqueVolumeName, pvCap)
klog.V(5).InfoS("NodeExpandVolume updating size", "actualSize", pvcStatusCap.String(), "desiredSize", pvCap.String(), "volumeName", uniqueVolumeName)
// in case the actualStateOfWorld was rebuild after kubelet restart ensure that claimSize is set to accurate value // in case the actualStateOfWorld was rebuild after kubelet restart ensure that claimSize is set to accurate value
dswp.actualStateOfWorld.InitializeClaimSize(klog.TODO(), uniqueVolumeName, pvcStatusCap) dswp.actualStateOfWorld.InitializeClaimSize(klog.TODO(), uniqueVolumeName, pvcStatusCap)
} }

View File

@ -36,7 +36,6 @@ type NodeExpander struct {
// computed via precheck // computed via precheck
pvcStatusCap resource.Quantity pvcStatusCap resource.Quantity
pvCap resource.Quantity
resizeStatus v1.ClaimResourceStatus resizeStatus v1.ClaimResourceStatus
// indicates that if volume expansion failed on the node, then current expansion should be marked // indicates that if volume expansion failed on the node, then current expansion should be marked
@ -47,6 +46,9 @@ type NodeExpander struct {
// PVC has already been updated - possibly because expansion already succeeded on different node. // PVC has already been updated - possibly because expansion already succeeded on different node.
// This can happen when a RWX PVC is expanded. // This can happen when a RWX PVC is expanded.
pvcAlreadyUpdated bool pvcAlreadyUpdated bool
// testStatus is used for testing purposes only.
testStatus testResponseData
} }
func newNodeExpander(resizeOp nodeResizeOperationOpts, client clientset.Interface, recorder record.EventRecorder) *NodeExpander { func newNodeExpander(resizeOp nodeResizeOperationOpts, client clientset.Interface, recorder record.EventRecorder) *NodeExpander {
@ -76,7 +78,6 @@ type testResponseData struct {
// it returns false. // it returns false.
func (ne *NodeExpander) runPreCheck() bool { func (ne *NodeExpander) runPreCheck() bool {
ne.pvcStatusCap = ne.pvc.Status.Capacity[v1.ResourceStorage] ne.pvcStatusCap = ne.pvc.Status.Capacity[v1.ResourceStorage]
ne.pvCap = ne.pv.Spec.Capacity[v1.ResourceStorage]
allocatedResourceStatus := ne.pvc.Status.AllocatedResourceStatuses allocatedResourceStatus := ne.pvc.Status.AllocatedResourceStatuses
if currentStatus, ok := allocatedResourceStatus[v1.ResourceStorage]; ok { if currentStatus, ok := allocatedResourceStatus[v1.ResourceStorage]; ok {
@ -117,10 +118,12 @@ func (ne *NodeExpander) runPreCheck() bool {
return false return false
} }
func (ne *NodeExpander) expandOnPlugin() (bool, error, testResponseData) { func (ne *NodeExpander) expandOnPlugin() (bool, resource.Quantity, error) {
allowExpansion := ne.runPreCheck() allowExpansion := ne.runPreCheck()
if !allowExpansion { if !allowExpansion {
return false, nil, testResponseData{false, true} klog.V(3).Infof("NodeExpandVolume is not allowed to proceed for volume %s with resizeStatus %s", ne.vmt.VolumeName, ne.resizeStatus)
ne.testStatus = testResponseData{false /* resizeCalledOnPlugin */, true /* assumeResizeFinished */}
return false, ne.pluginResizeOpts.OldSize, nil
} }
var err error var err error
@ -132,7 +135,8 @@ func (ne *NodeExpander) expandOnPlugin() (bool, error, testResponseData) {
if err != nil { if err != nil {
msg := ne.vmt.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err) msg := ne.vmt.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err)
klog.Errorf(msg.Error()) klog.Errorf(msg.Error())
return false, err, testResponseData{} ne.testStatus = testResponseData{}
return false, ne.pluginResizeOpts.OldSize, err
} }
} }
_, resizeErr := ne.volumePlugin.NodeExpand(ne.pluginResizeOpts) _, resizeErr := ne.volumePlugin.NodeExpand(ne.pluginResizeOpts)
@ -159,24 +163,27 @@ func (ne *NodeExpander) expandOnPlugin() (bool, error, testResponseData) {
if volumetypes.IsFailedPreconditionError(resizeErr) { if volumetypes.IsFailedPreconditionError(resizeErr) {
ne.actualStateOfWorld.MarkForInUseExpansionError(ne.vmt.VolumeName) ne.actualStateOfWorld.MarkForInUseExpansionError(ne.vmt.VolumeName)
klog.Errorf(ne.vmt.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error()) klog.Errorf(ne.vmt.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error())
return false, nil, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true} ne.testStatus = testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true}
return false, ne.pluginResizeOpts.OldSize, nil
} }
return false, resizeErr, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true} ne.testStatus = testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true}
return false, ne.pluginResizeOpts.OldSize, resizeErr
} }
simpleMsg, detailedMsg := ne.vmt.GenerateMsg("MountVolume.NodeExpandVolume succeeded", nodeName) simpleMsg, detailedMsg := ne.vmt.GenerateMsg("MountVolume.NodeExpandVolume succeeded", nodeName)
ne.recorder.Eventf(ne.vmt.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) ne.recorder.Eventf(ne.vmt.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
ne.recorder.Eventf(ne.pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg) ne.recorder.Eventf(ne.pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.InfoS(detailedMsg, "pod", klog.KObj(ne.vmt.Pod)) klog.InfoS(detailedMsg, "pod", klog.KObj(ne.vmt.Pod))
ne.testStatus = testResponseData{true /*resizeCalledOnPlugin */, true /* assumeResizeFinished */}
// no need to update PVC object if we already updated it // no need to update PVC object if we already updated it
if ne.pvcAlreadyUpdated { if ne.pvcAlreadyUpdated {
return true, nil, testResponseData{true, true} return true, ne.pluginResizeOpts.NewSize, nil
} }
// File system resize succeeded, now update the PVC's Capacity to match the PV's // File system resize succeeded, now update the PVC's Capacity to match the PV's
ne.pvc, err = util.MarkFSResizeFinished(ne.pvc, ne.pluginResizeOpts.NewSize, ne.kubeClient) ne.pvc, err = util.MarkFSResizeFinished(ne.pvc, ne.pluginResizeOpts.NewSize, ne.kubeClient)
if err != nil { if err != nil {
return true, fmt.Errorf("mountVolume.NodeExpandVolume update pvc status failed: %v", err), testResponseData{true, true} return true, ne.pluginResizeOpts.NewSize, fmt.Errorf("mountVolume.NodeExpandVolume update pvc status failed: %w", err)
} }
return true, nil, testResponseData{true, true} return true, ne.pluginResizeOpts.NewSize, nil
} }

View File

@ -162,7 +162,8 @@ func TestNodeExpander(t *testing.T) {
ogInstance, _ := og.(*operationGenerator) ogInstance, _ := og.(*operationGenerator)
nodeExpander := newNodeExpander(resizeOp, ogInstance.kubeClient, ogInstance.recorder) nodeExpander := newNodeExpander(resizeOp, ogInstance.kubeClient, ogInstance.recorder)
_, err, expansionResponse := nodeExpander.expandOnPlugin() _, _, err := nodeExpander.expandOnPlugin()
expansionResponse := nodeExpander.testStatus
pvc = nodeExpander.pvc pvc = nodeExpander.pvc
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]

View File

@ -1957,14 +1957,14 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater, actualStateOfWorld ActualStateOfWorldMounterUpdater,
resizeOptions volume.NodeResizeOptions) (bool, error, error) { resizeOptions volume.NodeResizeOptions) (bool, error, error) {
resizeDone, err := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) resizeDone, newSize, err := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
if err != nil { if err != nil {
e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed", err) e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed", err)
klog.Errorf(e2.Error()) klog.Errorf(e2.Error())
return false, e1, e2 return false, e1, e2
} }
if resizeDone { if resizeDone {
markingDone := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.VolumeName, &resizeOptions.NewSize) markingDone := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.VolumeName, &newSize)
if !markingDone { if !markingDone {
// On failure, return error. Caller will log and retry. // On failure, return error. Caller will log and retry.
genericFailureError := fmt.Errorf("unable to mark volume as resized") genericFailureError := fmt.Errorf("unable to mark volume as resized")
@ -2009,8 +2009,11 @@ func (og *operationGenerator) expandVolumeDuringMount(volumeToMount VolumeToMoun
actualStateOfWorld: actualStateOfWorld, actualStateOfWorld: actualStateOfWorld,
} }
if og.checkForRecoveryFromExpansion(pvc, volumeToMount) { if og.checkForRecoveryFromExpansion(pvc, volumeToMount) {
// if recovery feature is enabled, we can use allocated size from PVC status as new size
rsOpts.NewSize = pvc.Status.AllocatedResources[v1.ResourceStorage]
resizeOp.pluginResizeOpts = rsOpts
nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder) nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder)
resizeFinished, err, _ := nodeExpander.expandOnPlugin() resizeFinished, _, err := nodeExpander.expandOnPlugin()
return resizeFinished, err return resizeFinished, err
} else { } else {
return og.legacyCallNodeExpandOnPlugin(resizeOp) return og.legacyCallNodeExpandOnPlugin(resizeOp)
@ -2041,7 +2044,7 @@ func (og *operationGenerator) checkIfSupportsNodeExpansion(volumeToMount VolumeT
func (og *operationGenerator) nodeExpandVolume( func (og *operationGenerator) nodeExpandVolume(
volumeToMount VolumeToMount, volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater, actualStateOfWorld ActualStateOfWorldMounterUpdater,
rsOpts volume.NodeResizeOptions) (bool, error) { rsOpts volume.NodeResizeOptions) (bool, resource.Quantity, error) {
supportsExpansion, expandableVolumePlugin := og.checkIfSupportsNodeExpansion(volumeToMount) supportsExpansion, expandableVolumePlugin := og.checkIfSupportsNodeExpansion(volumeToMount)
@ -2050,9 +2053,10 @@ func (og *operationGenerator) nodeExpandVolume(
if rsOpts.NewSize.Cmp(rsOpts.OldSize) > 0 { if rsOpts.NewSize.Cmp(rsOpts.OldSize) > 0 {
pv := volumeToMount.VolumeSpec.PersistentVolume pv := volumeToMount.VolumeSpec.PersistentVolume
pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{}) pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
currentSize := pvc.Status.Capacity.Storage()
if err != nil { if err != nil {
// Return error rather than leave the file system un-resized, caller will log and retry // Return error rather than leave the file system un-resized, caller will log and retry
return false, fmt.Errorf("mountVolume.NodeExpandVolume get PVC failed : %v", err) return false, *currentSize, fmt.Errorf("mountVolume.NodeExpandVolume get PVC failed : %w", err)
} }
if volumeToMount.VolumeSpec.ReadOnly { if volumeToMount.VolumeSpec.ReadOnly {
@ -2060,7 +2064,7 @@ func (og *operationGenerator) nodeExpandVolume(
klog.Warningf(detailedMsg) klog.Warningf(detailedMsg)
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
return true, nil return true, *currentSize, nil
} }
resizeOp := nodeResizeOperationOpts{ resizeOp := nodeResizeOperationOpts{
vmt: volumeToMount, vmt: volumeToMount,
@ -2072,15 +2076,20 @@ func (og *operationGenerator) nodeExpandVolume(
} }
if og.checkForRecoveryFromExpansion(pvc, volumeToMount) { if og.checkForRecoveryFromExpansion(pvc, volumeToMount) {
// if recovery feature is enabled, we can use allocated size from PVC status as new size
newSize := pvc.Status.AllocatedResources[v1.ResourceStorage]
rsOpts.NewSize = newSize
resizeOp.pluginResizeOpts.NewSize = newSize
nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder) nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder)
resizeFinished, err, _ := nodeExpander.expandOnPlugin() resizeFinished, newSize, err := nodeExpander.expandOnPlugin()
return resizeFinished, err return resizeFinished, newSize, err
} else { } else {
return og.legacyCallNodeExpandOnPlugin(resizeOp) resizeFinished, err := og.legacyCallNodeExpandOnPlugin(resizeOp)
return resizeFinished, rsOpts.NewSize, err
} }
} }
} }
return true, nil return true, rsOpts.OldSize, nil
} }
func (og *operationGenerator) checkForRecoveryFromExpansion(pvc *v1.PersistentVolumeClaim, volumeToMount VolumeToMount) bool { func (og *operationGenerator) checkForRecoveryFromExpansion(pvc *v1.PersistentVolumeClaim, volumeToMount VolumeToMount) bool {

View File

@ -305,7 +305,7 @@ func TestOperationGenerator_nodeExpandVolume(t *testing.T) {
asow := newFakeActualStateOfWorld() asow := newFakeActualStateOfWorld()
ogInstance, _ := og.(*operationGenerator) ogInstance, _ := og.(*operationGenerator)
_, err := ogInstance.nodeExpandVolume(vmt, asow, pluginResizeOpts) _, _, err := ogInstance.nodeExpandVolume(vmt, asow, pluginResizeOpts)
if !test.expectError && err != nil { if !test.expectError && err != nil {
t.Errorf("For test %s, expected no error got: %v", test.name, err) t.Errorf("For test %s, expected no error got: %v", test.name, err)

View File

@ -96,6 +96,7 @@ type testParameters struct {
enableNodeExpansion bool // enable node expansion for CSI mock driver enableNodeExpansion bool // enable node expansion for CSI mock driver
// just disable resizing on driver it overrides enableResizing flag for CSI mock driver // just disable resizing on driver it overrides enableResizing flag for CSI mock driver
disableResizingOnDriver bool disableResizingOnDriver bool
disableControllerExpansion bool
enableSnapshot bool enableSnapshot bool
enableVolumeMountGroup bool // enable the VOLUME_MOUNT_GROUP node capability in the CSI mock driver. enableVolumeMountGroup bool // enable the VOLUME_MOUNT_GROUP node capability in the CSI mock driver.
enableNodeVolumeCondition bool enableNodeVolumeCondition bool
@ -172,6 +173,7 @@ func (m *mockDriverSetup) init(ctx context.Context, tp testParameters) {
EnableResizing: tp.enableResizing, EnableResizing: tp.enableResizing,
EnableNodeExpansion: tp.enableNodeExpansion, EnableNodeExpansion: tp.enableNodeExpansion,
EnableNodeVolumeCondition: tp.enableNodeVolumeCondition, EnableNodeVolumeCondition: tp.enableNodeVolumeCondition,
DisableControllerExpansion: tp.disableControllerExpansion,
EnableSnapshot: tp.enableSnapshot, EnableSnapshot: tp.enableSnapshot,
EnableVolumeMountGroup: tp.enableVolumeMountGroup, EnableVolumeMountGroup: tp.enableVolumeMountGroup,
TokenRequests: tp.tokenRequests, TokenRequests: tp.tokenRequests,

View File

@ -45,8 +45,10 @@ type expansionStatus int
const ( const (
expansionSuccess = iota expansionSuccess = iota
expansionFailedOnController expansionFailedOnControllerWithInfeasibleError
expansionFailedOnNode expansionFailedOnControllerWithFinalError
expansionFailedOnNodeWithInfeasibleError
expansionFailedOnNodeWithFinalError
expansionFailedMissingStagingPath expansionFailedMissingStagingPath
) )
@ -65,6 +67,7 @@ type recoveryTest struct {
pvcRequestSize string pvcRequestSize string
allocatedResource string allocatedResource string
simulatedCSIDriverError expansionStatus simulatedCSIDriverError expansionStatus
disableControllerExpansion bool
expectedResizeStatus v1.ClaimResourceStatus expectedResizeStatus v1.ClaimResourceStatus
recoverySize resource.Quantity recoverySize resource.Quantity
} }
@ -403,32 +406,62 @@ var _ = utils.SIGDescribe("CSI Mock volume expansion", func() {
name: "should record target size in allocated resources", name: "should record target size in allocated resources",
pvcRequestSize: "4Gi", pvcRequestSize: "4Gi",
allocatedResource: "4Gi", allocatedResource: "4Gi",
disableControllerExpansion: false,
simulatedCSIDriverError: expansionSuccess, simulatedCSIDriverError: expansionSuccess,
expectedResizeStatus: "", expectedResizeStatus: "",
}, },
{
name: "should allow recovery if controller expansion fails with infeasible error",
pvcRequestSize: "11Gi", // expansion to 11Gi will cause expansion to fail on controller
allocatedResource: "11Gi",
disableControllerExpansion: false,
simulatedCSIDriverError: expansionFailedOnControllerWithInfeasibleError,
expectedResizeStatus: v1.PersistentVolumeClaimControllerResizeInfeasible,
recoverySize: resource.MustParse("4Gi"),
},
{ {
name: "should allow recovery if controller expansion fails with final error", name: "should allow recovery if controller expansion fails with final error",
pvcRequestSize: "11Gi", // expansion to 11Gi will cause expansion to fail on controller pvcRequestSize: "11Gi", // expansion to 11Gi will cause expansion to fail on controller
allocatedResource: "11Gi", allocatedResource: "11Gi",
simulatedCSIDriverError: expansionFailedOnController, disableControllerExpansion: false,
expectedResizeStatus: v1.PersistentVolumeClaimControllerResizeInfeasible, simulatedCSIDriverError: expansionFailedOnControllerWithFinalError,
expectedResizeStatus: v1.PersistentVolumeClaimControllerResizeInProgress,
recoverySize: resource.MustParse("4Gi"), recoverySize: resource.MustParse("4Gi"),
}, },
{ {
name: "recovery should not be possible in partially expanded volumes", name: "recovery should not be possible in partially expanded volumes",
pvcRequestSize: "9Gi", // expansion to 9Gi will cause expansion to fail on node pvcRequestSize: "9Gi", // expansion to 9Gi will cause expansion to fail on node
allocatedResource: "9Gi", allocatedResource: "9Gi",
simulatedCSIDriverError: expansionFailedOnNode, disableControllerExpansion: false,
simulatedCSIDriverError: expansionFailedOnNodeWithInfeasibleError,
expectedResizeStatus: v1.PersistentVolumeClaimNodeResizeInfeasible, expectedResizeStatus: v1.PersistentVolumeClaimNodeResizeInfeasible,
recoverySize: resource.MustParse("5Gi"), recoverySize: resource.MustParse("5Gi"),
}, },
{
name: "recovery should be possible for node-only expanded volumes with infeasible error",
pvcRequestSize: "9Gi", // expansion to 9Gi will cause expansion to fail on node
allocatedResource: "9Gi",
disableControllerExpansion: true,
simulatedCSIDriverError: expansionFailedOnNodeWithInfeasibleError,
expectedResizeStatus: v1.PersistentVolumeClaimNodeResizeInfeasible,
recoverySize: resource.MustParse("5Gi"),
},
{
name: "recovery should be possible for node-only expanded volumes with final error",
pvcRequestSize: "9Gi", // expansion to 9Gi will cause expansion to fail on node
allocatedResource: "9Gi",
disableControllerExpansion: true,
simulatedCSIDriverError: expansionFailedOnNodeWithFinalError,
expectedResizeStatus: v1.PersistentVolumeClaimNodeResizeInProgress,
recoverySize: resource.MustParse("5Gi"),
},
} }
for _, t := range tests { for _, t := range tests {
test := t test := t
ginkgo.It(test.name, func(ctx context.Context) { ginkgo.It(test.name, func(ctx context.Context) {
var err error var err error
params := testParameters{enableResizing: true, enableNodeExpansion: true, enableRecoverExpansionFailure: true} params := testParameters{enableResizing: true, enableNodeExpansion: true, enableRecoverExpansionFailure: true, disableControllerExpansion: test.disableControllerExpansion}
if test.simulatedCSIDriverError != expansionSuccess { if test.simulatedCSIDriverError != expansionSuccess {
params.hooks = createExpansionHook(test.simulatedCSIDriverError) params.hooks = createExpansionHook(test.simulatedCSIDriverError)
@ -476,9 +509,15 @@ func validateRecoveryBehaviour(ctx context.Context, pvc *v1.PersistentVolumeClai
err = waitForAllocatedResource(ctx, pvc, m, test.allocatedResource) err = waitForAllocatedResource(ctx, pvc, m, test.allocatedResource)
framework.ExpectNoError(err, "While waiting for allocated resource to be updated") framework.ExpectNoError(err, "While waiting for allocated resource to be updated")
if test.expectedResizeStatus == v1.PersistentVolumeClaimNodeResizeInfeasible {
ginkgo.By("Waiting for kubelet to fail expansion on the node")
err = waitForResizeToFailOnNode(ctx, pvc, m.cs)
framework.ExpectNoError(err, "While waiting for resize status to be set")
} else {
ginkgo.By("Waiting for resizer to set resize status") ginkgo.By("Waiting for resizer to set resize status")
err = waitForResizeStatus(ctx, pvc, m.cs, test.expectedResizeStatus) err = waitForResizeStatus(ctx, pvc, m.cs, test.expectedResizeStatus)
framework.ExpectNoError(err, "While waiting for resize status to be set") framework.ExpectNoError(err, "While waiting for resize status to be set")
}
ginkgo.By("Recover pvc size") ginkgo.By("Recover pvc size")
newPVC, err := testsuites.ExpandPVCSize(ctx, pvc, test.recoverySize, m.cs) newPVC, err := testsuites.ExpandPVCSize(ctx, pvc, test.recoverySize, m.cs)
@ -492,16 +531,25 @@ func validateRecoveryBehaviour(ctx context.Context, pvc *v1.PersistentVolumeClai
} }
// if expansion failed on controller with final error, then recovery should be possible // if expansion failed on controller with final error, then recovery should be possible
if test.simulatedCSIDriverError == expansionFailedOnController { if test.simulatedCSIDriverError == expansionFailedOnControllerWithInfeasibleError {
validateExpansionSuccess(ctx, pvc, m, test, test.recoverySize.String())
return
}
// if expansion failed on node with final error but volume was only expanded on the node
// then recovery should be possible
if test.disableControllerExpansion &&
(test.simulatedCSIDriverError == expansionFailedOnNodeWithInfeasibleError ||
test.simulatedCSIDriverError == expansionFailedOnNodeWithFinalError) {
validateExpansionSuccess(ctx, pvc, m, test, test.recoverySize.String()) validateExpansionSuccess(ctx, pvc, m, test, test.recoverySize.String())
return return
} }
// if expansion succeeded on controller but failed on the node // if expansion succeeded on controller but failed on the node
if test.simulatedCSIDriverError == expansionFailedOnNode { if test.simulatedCSIDriverError == expansionFailedOnNodeWithInfeasibleError {
ginkgo.By("Wait for expansion to fail on node again") ginkgo.By("Wait for expansion to fail on node again")
err = waitForResizeStatus(ctx, pvc, m.cs, v1.PersistentVolumeClaimNodeResizeInfeasible) err = waitForResizeToFailOnNode(ctx, pvc, m.cs)
framework.ExpectNoError(err, "While waiting for resize status to be set to expansion-failed-on-node") framework.ExpectNoError(err, "While waiting for resize status to be set")
ginkgo.By("verify allocated resources after recovery") ginkgo.By("verify allocated resources after recovery")
pvc, err = m.cs.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(context.TODO(), pvc.Name, metav1.GetOptions{}) pvc, err = m.cs.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(context.TODO(), pvc.Name, metav1.GetOptions{})
@ -520,11 +568,11 @@ func validateRecoveryBehaviour(ctx context.Context, pvc *v1.PersistentVolumeClai
func validateExpansionSuccess(ctx context.Context, pvc *v1.PersistentVolumeClaim, m *mockDriverSetup, test recoveryTest, expectedAllocatedSize string) { func validateExpansionSuccess(ctx context.Context, pvc *v1.PersistentVolumeClaim, m *mockDriverSetup, test recoveryTest, expectedAllocatedSize string) {
var err error var err error
ginkgo.By("Waiting for persistent volume resize to finish") ginkgo.By(fmt.Sprintf("Waiting for PV %s to be expanded to %s", pvc.Spec.VolumeName, test.recoverySize.String()))
err = testsuites.WaitForControllerVolumeResize(ctx, pvc, m.cs, csiResizeWaitPeriod) err = testsuites.WaitForControllerVolumeResize(ctx, pvc, m.cs, csiResizeWaitPeriod)
framework.ExpectNoError(err, "While waiting for PV resize to finish") framework.ExpectNoError(err, "While waiting for PV resize to finish")
ginkgo.By("Waiting for PVC resize to finish") ginkgo.By(fmt.Sprintf("Waiting for PVC %s to be expanded to %s", pvc.Name, test.recoverySize.String()))
pvc, err = testsuites.WaitForFSResize(ctx, pvc, m.cs) pvc, err = testsuites.WaitForFSResize(ctx, pvc, m.cs)
framework.ExpectNoError(err, "while waiting for PVC to finish") framework.ExpectNoError(err, "while waiting for PVC to finish")
@ -561,6 +609,31 @@ func waitForResizeStatus(ctx context.Context, pvc *v1.PersistentVolumeClaim, c c
return nil return nil
} }
func waitForResizeToFailOnNode(ctx context.Context, pvc *v1.PersistentVolumeClaim, c clientset.Interface) error {
var finalConditions []v1.PersistentVolumeClaimCondition
waitErr := wait.PollUntilContextTimeout(ctx, resizePollInterval, csiResizeWaitPeriod, true, func(pollContext context.Context) (bool, error) {
var err error
updatedPVC, err := c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(pollContext, pvc.Name, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("error fetching pvc %q for checking for resize status: %w", pvc.Name, err)
}
pvcConditions := updatedPVC.Status.Conditions
for _, cond := range pvcConditions {
if cond.Type == v1.PersistentVolumeClaimNodeResizeError {
return true, nil
}
}
finalConditions = pvcConditions
return false, nil
})
if waitErr != nil {
return fmt.Errorf("error while waiting for resize condition sync to NodeResizeError, actualStatus %+v: %w", finalConditions, waitErr)
}
return nil
}
func waitForAllocatedResource(ctx context.Context, pvc *v1.PersistentVolumeClaim, m *mockDriverSetup, expectedSize string) error { func waitForAllocatedResource(ctx context.Context, pvc *v1.PersistentVolumeClaim, m *mockDriverSetup, expectedSize string) error {
expectedQuantity := resource.MustParse(expectedSize) expectedQuantity := resource.MustParse(expectedSize)
waitErr := wait.PollUntilContextTimeout(ctx, resizePollInterval, csiResizeWaitPeriod, true, func(pollContext context.Context) (bool, error) { waitErr := wait.PollUntilContextTimeout(ctx, resizePollInterval, csiResizeWaitPeriod, true, func(pollContext context.Context) (bool, error) {
@ -596,7 +669,7 @@ func createExpansionHook(expectedExpansionStatus expansionStatus) *drivers.Hooks
} }
} }
case expansionFailedOnController: case expansionFailedOnControllerWithInfeasibleError:
expansionRequest, ok := request.(*csipbv1.ControllerExpandVolumeRequest) expansionRequest, ok := request.(*csipbv1.ControllerExpandVolumeRequest)
if ok { if ok {
requestedSize := resource.NewQuantity(expansionRequest.CapacityRange.RequiredBytes, resource.BinarySI) requestedSize := resource.NewQuantity(expansionRequest.CapacityRange.RequiredBytes, resource.BinarySI)
@ -604,7 +677,16 @@ func createExpansionHook(expectedExpansionStatus expansionStatus) *drivers.Hooks
return nil, status.Error(codes.InvalidArgument, "invalid expansion request") return nil, status.Error(codes.InvalidArgument, "invalid expansion request")
} }
} }
case expansionFailedOnNode: case expansionFailedOnControllerWithFinalError:
// This simulates a condition that a final, but not infeasible error is returned when expansion fails in the controller.
expansionRequest, ok := request.(*csipbv1.ControllerExpandVolumeRequest)
if ok {
requestedSize := resource.NewQuantity(expansionRequest.CapacityRange.RequiredBytes, resource.BinarySI)
if requestedSize.Cmp(maxControllerSizeLimit) > 0 {
return nil, status.Error(codes.PermissionDenied, "permission denied for expansion")
}
}
case expansionFailedOnNodeWithInfeasibleError:
expansionRequest, ok := request.(*csipbv1.NodeExpandVolumeRequest) expansionRequest, ok := request.(*csipbv1.NodeExpandVolumeRequest)
if ok { if ok {
requestedSize := resource.NewQuantity(expansionRequest.CapacityRange.RequiredBytes, resource.BinarySI) requestedSize := resource.NewQuantity(expansionRequest.CapacityRange.RequiredBytes, resource.BinarySI)
@ -613,6 +695,14 @@ func createExpansionHook(expectedExpansionStatus expansionStatus) *drivers.Hooks
} }
} }
case expansionFailedOnNodeWithFinalError:
expansionRequest, ok := request.(*csipbv1.NodeExpandVolumeRequest)
if ok {
requestedSize := resource.NewQuantity(expansionRequest.CapacityRange.RequiredBytes, resource.BinarySI)
if requestedSize.Cmp(maxNodeExpansionLimit) > 0 {
return nil, status.Error(codes.PermissionDenied, "permission denied for expansion")
}
}
} }
return nil, nil return nil, nil

View File

@ -350,6 +350,7 @@ type mockCSIDriver struct {
embeddedCSIDriver *mockdriver.CSIDriver embeddedCSIDriver *mockdriver.CSIDriver
enableSELinuxMount *bool enableSELinuxMount *bool
enableRecoverExpansionFailure bool enableRecoverExpansionFailure bool
disableControllerExpansion bool
enableHonorPVReclaimPolicy bool enableHonorPVReclaimPolicy bool
// Additional values set during PrepareTest // Additional values set during PrepareTest
@ -392,6 +393,7 @@ type CSIMockDriverOpts struct {
EnableTopology bool EnableTopology bool
EnableResizing bool EnableResizing bool
EnableNodeExpansion bool EnableNodeExpansion bool
DisableControllerExpansion bool
EnableSnapshot bool EnableSnapshot bool
EnableVolumeMountGroup bool EnableVolumeMountGroup bool
EnableNodeVolumeCondition bool EnableNodeVolumeCondition bool
@ -550,6 +552,7 @@ func InitMockCSIDriver(driverOpts CSIMockDriverOpts) MockCSITestDriver {
attachLimit: driverOpts.AttachLimit, attachLimit: driverOpts.AttachLimit,
enableNodeExpansion: driverOpts.EnableNodeExpansion, enableNodeExpansion: driverOpts.EnableNodeExpansion,
enableNodeVolumeCondition: driverOpts.EnableNodeVolumeCondition, enableNodeVolumeCondition: driverOpts.EnableNodeVolumeCondition,
disableControllerExpansion: driverOpts.DisableControllerExpansion,
tokenRequests: driverOpts.TokenRequests, tokenRequests: driverOpts.TokenRequests,
requiresRepublish: driverOpts.RequiresRepublish, requiresRepublish: driverOpts.RequiresRepublish,
fsGroupPolicy: driverOpts.FSGroupPolicy, fsGroupPolicy: driverOpts.FSGroupPolicy,
@ -628,6 +631,7 @@ func (m *mockCSIDriver) PrepareTest(ctx context.Context, f *framework.Framework)
DriverName: "csi-mock-" + f.UniqueName, DriverName: "csi-mock-" + f.UniqueName,
AttachLimit: int64(m.attachLimit), AttachLimit: int64(m.attachLimit),
NodeExpansionRequired: m.enableNodeExpansion, NodeExpansionRequired: m.enableNodeExpansion,
DisableControllerExpansion: m.disableControllerExpansion,
NodeVolumeConditionRequired: m.enableNodeVolumeCondition, NodeVolumeConditionRequired: m.enableNodeVolumeCondition,
VolumeMountGroupRequired: m.enableVolumeMountGroup, VolumeMountGroupRequired: m.enableVolumeMountGroup,
EnableTopology: m.enableTopology, EnableTopology: m.enableTopology,

View File

@ -184,7 +184,7 @@ func (v *volumeExpandTestSuite) DefineTests(driver storageframework.TestDriver,
newSize := currentPvcSize.DeepCopy() newSize := currentPvcSize.DeepCopy()
newSize.Add(resource.MustParse("1Gi")) newSize.Add(resource.MustParse("1Gi"))
framework.Logf("currentPvcSize %v, newSize %v", currentPvcSize, newSize) framework.Logf("currentPvcSize %v, newSize %v", currentPvcSize, newSize)
_, err = ExpandPVCSize(ctx, l.resource.Pvc, newSize, f.ClientSet) _, err = ExpandPVCSizeToError(ctx, l.resource.Pvc, newSize, f.ClientSet)
gomega.Expect(err).To(gomega.MatchError(apierrors.IsForbidden, "While updating non-expandable PVC")) gomega.Expect(err).To(gomega.MatchError(apierrors.IsForbidden, "While updating non-expandable PVC"))
}) })
} else { } else {
@ -318,15 +318,15 @@ func ExpandPVCSize(ctx context.Context, origPVC *v1.PersistentVolumeClaim, size
// Retry the update on error, until we hit a timeout. // Retry the update on error, until we hit a timeout.
// TODO: Determine whether "retry with timeout" is appropriate here. Maybe we should only retry on version conflict. // TODO: Determine whether "retry with timeout" is appropriate here. Maybe we should only retry on version conflict.
var lastUpdateError error var lastUpdateError error
waitErr := wait.PollImmediate(resizePollInterval, 30*time.Second, func() (bool, error) { waitErr := wait.PollUntilContextTimeout(ctx, resizePollInterval, 30*time.Second, true, func(pollContext context.Context) (bool, error) {
var err error var err error
updatedPVC, err = c.CoreV1().PersistentVolumeClaims(origPVC.Namespace).Get(ctx, pvcName, metav1.GetOptions{}) updatedPVC, err = c.CoreV1().PersistentVolumeClaims(origPVC.Namespace).Get(pollContext, pvcName, metav1.GetOptions{})
if err != nil { if err != nil {
return false, fmt.Errorf("error fetching pvc %q for resizing: %w", pvcName, err) return false, fmt.Errorf("error fetching pvc %q for resizing: %w", pvcName, err)
} }
updatedPVC.Spec.Resources.Requests[v1.ResourceStorage] = size updatedPVC.Spec.Resources.Requests[v1.ResourceStorage] = size
updatedPVC, err = c.CoreV1().PersistentVolumeClaims(origPVC.Namespace).Update(ctx, updatedPVC, metav1.UpdateOptions{}) updatedPVC, err = c.CoreV1().PersistentVolumeClaims(origPVC.Namespace).Update(pollContext, updatedPVC, metav1.UpdateOptions{})
if err != nil { if err != nil {
framework.Logf("Error updating pvc %s: %v", pvcName, err) framework.Logf("Error updating pvc %s: %v", pvcName, err)
lastUpdateError = err lastUpdateError = err
@ -343,6 +343,42 @@ func ExpandPVCSize(ctx context.Context, origPVC *v1.PersistentVolumeClaim, size
return updatedPVC, nil return updatedPVC, nil
} }
func ExpandPVCSizeToError(ctx context.Context, origPVC *v1.PersistentVolumeClaim, size resource.Quantity, c clientset.Interface) (*v1.PersistentVolumeClaim, error) {
pvcName := origPVC.Name
updatedPVC := origPVC.DeepCopy()
var lastUpdateError error
waitErr := wait.PollUntilContextTimeout(ctx, resizePollInterval, 30*time.Second, true, func(pollContext context.Context) (bool, error) {
var err error
updatedPVC, err = c.CoreV1().PersistentVolumeClaims(origPVC.Namespace).Get(pollContext, pvcName, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("error fetching pvc %q for resizing: %w", pvcName, err)
}
updatedPVC.Spec.Resources.Requests[v1.ResourceStorage] = size
updatedPVC, err = c.CoreV1().PersistentVolumeClaims(origPVC.Namespace).Update(pollContext, updatedPVC, metav1.UpdateOptions{})
if err == nil {
return false, fmt.Errorf("pvc %s should not be allowed to be updated", pvcName)
} else {
lastUpdateError = err
if apierrors.IsForbidden(err) {
return true, nil
}
framework.Logf("Error updating pvc %s: %v", pvcName, err)
return false, nil
}
})
if wait.Interrupted(waitErr) {
return nil, fmt.Errorf("timed out attempting to update PVC size. last update error: %w", lastUpdateError)
}
if waitErr != nil {
return nil, fmt.Errorf("failed to expand PVC size (check logs for error): %w", waitErr)
}
return updatedPVC, lastUpdateError
}
// WaitForResizingCondition waits for the pvc condition to be PersistentVolumeClaimResizing // WaitForResizingCondition waits for the pvc condition to be PersistentVolumeClaimResizing
func WaitForResizingCondition(ctx context.Context, pvc *v1.PersistentVolumeClaim, c clientset.Interface, duration time.Duration) error { func WaitForResizingCondition(ctx context.Context, pvc *v1.PersistentVolumeClaim, c clientset.Interface, duration time.Duration) error {
waitErr := wait.PollUntilContextTimeout(ctx, resizePollInterval, duration, true, func(ctx context.Context) (bool, error) { waitErr := wait.PollUntilContextTimeout(ctx, resizePollInterval, duration, true, func(ctx context.Context) (bool, error) {
@ -423,9 +459,9 @@ func WaitForPendingFSResizeCondition(ctx context.Context, pvc *v1.PersistentVolu
// WaitForFSResize waits for the filesystem in the pv to be resized // WaitForFSResize waits for the filesystem in the pv to be resized
func WaitForFSResize(ctx context.Context, pvc *v1.PersistentVolumeClaim, c clientset.Interface) (*v1.PersistentVolumeClaim, error) { func WaitForFSResize(ctx context.Context, pvc *v1.PersistentVolumeClaim, c clientset.Interface) (*v1.PersistentVolumeClaim, error) {
var updatedPVC *v1.PersistentVolumeClaim var updatedPVC *v1.PersistentVolumeClaim
waitErr := wait.PollUntilContextTimeout(ctx, resizePollInterval, totalResizeWaitPeriod, true, func(ctx context.Context) (bool, error) { waitErr := wait.PollUntilContextTimeout(ctx, resizePollInterval, totalResizeWaitPeriod, true, func(pollContext context.Context) (bool, error) {
var err error var err error
updatedPVC, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(ctx, pvc.Name, metav1.GetOptions{}) updatedPVC, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(pollContext, pvc.Name, metav1.GetOptions{})
if err != nil { if err != nil {
return false, fmt.Errorf("error fetching pvc %q for checking for resize status : %w", pvc.Name, err) return false, fmt.Errorf("error fetching pvc %q for checking for resize status : %w", pvc.Name, err)