Merge pull request #108693 from gnufied/enable-rwx-call-all-nodes

Enable node-expansion to be called on all nodes for RWX volumes
This commit is contained in:
Kubernetes Prow Robot 2022-03-29 17:35:05 -07:00 committed by GitHub
commit 1266744002
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 941 additions and 554 deletions

View File

@ -26,6 +26,8 @@ import (
"sync" "sync"
"time" "time"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2" "k8s.io/klog/v2"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -581,6 +583,16 @@ func (asw *actualStateOfWorld) GetAttachState(
return AttachStateDetached return AttachStateDetached
} }
// SetVolumeClaimSize sets size of the volume. But this function should not be used from attach_detach controller.
func (asw *actualStateOfWorld) InitializeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) {
klog.V(5).Infof("no-op InitializeClaimSize call in attach-detach controller.")
}
func (asw *actualStateOfWorld) GetClaimSize(volumeName v1.UniqueVolumeName) *resource.Quantity {
// not needed in attach-detach controller
return nil
}
func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume { func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume {
asw.RLock() asw.RLock()
defer asw.RUnlock() defer asw.RUnlock()

View File

@ -25,6 +25,7 @@ import (
"sync" "sync"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
@ -106,7 +107,7 @@ type ActualStateOfWorld interface {
// volumes, depend on this to update the contents of the volume. // volumes, depend on this to update the contents of the volume.
// All volume mounting calls should be idempotent so a second mount call for // All volume mounting calls should be idempotent so a second mount call for
// volumes that do not need to update contents should not fail. // volumes that do not need to update contents should not fail.
PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) (bool, string, error) PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity) (bool, string, error)
// PodRemovedFromVolume returns true if the given pod does not exist in the list of // PodRemovedFromVolume returns true if the given pod does not exist in the list of
// mountedPods for the given volume in the cache, indicating that the pod has // mountedPods for the given volume in the cache, indicating that the pod has
@ -160,11 +161,6 @@ type ActualStateOfWorld interface {
// no longer referenced and may be globally unmounted and detached. // no longer referenced and may be globally unmounted and detached.
GetUnmountedVolumes() []AttachedVolume GetUnmountedVolumes() []AttachedVolume
// MarkFSResizeRequired marks each volume that is successfully attached and
// mounted for the specified pod as requiring file system resize (if the plugin for the
// volume indicates it requires file system resize).
MarkFSResizeRequired(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName)
// GetAttachedVolumes returns a list of volumes that is known to be attached // GetAttachedVolumes returns a list of volumes that is known to be attached
// to the node. This list can be used to determine volumes that are either in-use // to the node. This list can be used to determine volumes that are either in-use
// or have a mount/unmount operation pending. // or have a mount/unmount operation pending.
@ -284,6 +280,10 @@ type attachedVolume struct {
// volumeInUseErrorForExpansion indicates volume driver has previously returned volume-in-use error // volumeInUseErrorForExpansion indicates volume driver has previously returned volume-in-use error
// for this volume and volume expansion on this node should not be retried // for this volume and volume expansion on this node should not be retried
volumeInUseErrorForExpansion bool volumeInUseErrorForExpansion bool
// persistentVolumeSize records size of the volume when pod was started or
// size after successful completion of volume expansion operation.
persistentVolumeSize *resource.Quantity
} }
// The mountedPod object represents a pod for which the kubelet volume manager // The mountedPod object represents a pod for which the kubelet volume manager
@ -325,10 +325,6 @@ type mountedPod struct {
// volumeGidValue contains the value of the GID annotation, if present. // volumeGidValue contains the value of the GID annotation, if present.
volumeGidValue string volumeGidValue string
// fsResizeRequired indicates the underlying volume has been successfully
// mounted to this pod but its size has been expanded after that.
fsResizeRequired bool
// volumeMountStateForPod stores state of volume mount for the pod. if it is: // volumeMountStateForPod stores state of volume mount for the pod. if it is:
// - VolumeMounted: means volume for pod has been successfully mounted // - VolumeMounted: means volume for pod has been successfully mounted
// - VolumeMountUncertain: means volume for pod may not be mounted, but it must be unmounted // - VolumeMountUncertain: means volume for pod may not be mounted, but it must be unmounted
@ -548,30 +544,17 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M
return nil return nil
} }
func (asw *actualStateOfWorld) MarkVolumeAsResized( func (asw *actualStateOfWorld) MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool {
podName volumetypes.UniquePodName,
volumeName v1.UniqueVolumeName) error {
asw.Lock() asw.Lock()
defer asw.Unlock() defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName] volumeObj, ok := asw.attachedVolumes[volumeName]
if !volumeExists { if ok {
return fmt.Errorf( volumeObj.persistentVolumeSize = claimSize
"no volume with the name %q exists in the list of attached volumes", asw.attachedVolumes[volumeName] = volumeObj
volumeName) return true
} }
return false
podObj, podExists := volumeObj.mountedPods[podName]
if !podExists {
return fmt.Errorf(
"no pod with the name %q exists in the mounted pods list of volume %s",
podName,
volumeName)
}
klog.V(5).InfoS("Pod volume has been resized", "uniquePodName", podName, "volumeName", volumeName, "outerVolumeSpecName", podObj.outerVolumeSpecName)
podObj.fsResizeRequired = false
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
return nil
} }
func (asw *actualStateOfWorld) MarkRemountRequired( func (asw *actualStateOfWorld) MarkRemountRequired(
@ -596,40 +579,6 @@ func (asw *actualStateOfWorld) MarkRemountRequired(
} }
} }
func (asw *actualStateOfWorld) MarkFSResizeRequired(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName) {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
klog.InfoS("MarkFSResizeRequired for volume failed as volume does not exist", "volumeName", volumeName)
return
}
podObj, podExists := volumeObj.mountedPods[podName]
if !podExists {
klog.InfoS("MarkFSResizeRequired for volume failed because the pod does not exist", "uniquePodName", podName, "volumeName", volumeName)
return
}
volumePlugin, err :=
asw.volumePluginMgr.FindNodeExpandablePluginBySpec(podObj.volumeSpec)
if err != nil || volumePlugin == nil {
// Log and continue processing
klog.ErrorS(nil, "MarkFSResizeRequired failed to find expandable plugin for volume", "uniquePodName", podObj.podName, "volumeName", volumeObj.volumeName, "volumeSpecName", podObj.volumeSpec.Name())
return
}
if volumePlugin.RequiresFSResize() {
if !podObj.fsResizeRequired {
klog.V(3).InfoS("PVC volume of the pod requires file system resize", "uniquePodName", podName, "volumeName", volumeName, "outerVolumeSpecName", podObj.outerVolumeSpecName)
podObj.fsResizeRequired = true
}
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
}
}
func (asw *actualStateOfWorld) SetDeviceMountState( func (asw *actualStateOfWorld) SetDeviceMountState(
volumeName v1.UniqueVolumeName, deviceMountState operationexecutor.DeviceMountState, devicePath, deviceMountPath string) error { volumeName v1.UniqueVolumeName, deviceMountState operationexecutor.DeviceMountState, devicePath, deviceMountPath string) error {
asw.Lock() asw.Lock()
@ -651,6 +600,30 @@ func (asw *actualStateOfWorld) SetDeviceMountState(
return nil return nil
} }
func (asw *actualStateOfWorld) InitializeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) {
asw.Lock()
defer asw.Unlock()
volumeObj, ok := asw.attachedVolumes[volumeName]
// only set volume claim size if claimStatusSize is zero
// this can happen when volume was rebuilt after kubelet startup
if ok && volumeObj.persistentVolumeSize == nil {
volumeObj.persistentVolumeSize = claimSize
asw.attachedVolumes[volumeName] = volumeObj
}
}
func (asw *actualStateOfWorld) GetClaimSize(volumeName v1.UniqueVolumeName) *resource.Quantity {
asw.RLock()
defer asw.RUnlock()
volumeObj, ok := asw.attachedVolumes[volumeName]
if ok {
return volumeObj.persistentVolumeSize
}
return nil
}
func (asw *actualStateOfWorld) DeletePodFromVolume( func (asw *actualStateOfWorld) DeletePodFromVolume(
podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error { podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error {
asw.Lock() asw.Lock()
@ -691,9 +664,7 @@ func (asw *actualStateOfWorld) DeleteVolume(volumeName v1.UniqueVolumeName) erro
return nil return nil
} }
func (asw *actualStateOfWorld) PodExistsInVolume( func (asw *actualStateOfWorld) PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize resource.Quantity) (bool, string, error) {
podName volumetypes.UniquePodName,
volumeName v1.UniqueVolumeName) (bool, string, error) {
asw.RLock() asw.RLock()
defer asw.RUnlock() defer asw.RUnlock()
@ -711,15 +682,42 @@ func (asw *actualStateOfWorld) PodExistsInVolume(
if podObj.remountRequired { if podObj.remountRequired {
return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName) return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
} }
if podObj.fsResizeRequired && if currentSize, expandVolume := asw.volumeNeedsExpansion(volumeObj, desiredVolumeSize); expandVolume {
!volumeObj.volumeInUseErrorForExpansion { return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName, currentSize)
return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName)
} }
} }
return podExists, volumeObj.devicePath, nil return podExists, volumeObj.devicePath, nil
} }
func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, desiredVolumeSize resource.Quantity) (resource.Quantity, bool) {
currentSize := resource.Quantity{}
if volumeObj.persistentVolumeSize != nil {
currentSize = volumeObj.persistentVolumeSize.DeepCopy()
}
if volumeObj.volumeInUseErrorForExpansion {
return currentSize, false
}
if volumeObj.persistentVolumeSize == nil || desiredVolumeSize.IsZero() {
return currentSize, false
}
if desiredVolumeSize.Cmp(*volumeObj.persistentVolumeSize) > 0 {
volumePlugin, err := asw.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeObj.spec)
if err != nil || volumePlugin == nil {
// Log and continue processing
klog.InfoS("PodExistsInVolume failed to find expandable plugin",
"volume", volumeObj.volumeName,
"volumeSpecName", volumeObj.spec.Name())
return currentSize, false
}
if volumePlugin.RequiresFSResize() {
return currentSize, true
}
}
return currentSize, false
}
func (asw *actualStateOfWorld) PodRemovedFromVolume( func (asw *actualStateOfWorld) PodRemovedFromVolume(
podName volumetypes.UniquePodName, podName volumetypes.UniquePodName,
volumeName v1.UniqueVolumeName) bool { volumeName v1.UniqueVolumeName) bool {
@ -964,29 +962,31 @@ func newRemountRequiredError(
// fsResizeRequiredError is an error returned when PodExistsInVolume() found // fsResizeRequiredError is an error returned when PodExistsInVolume() found
// volume/pod attached/mounted but fsResizeRequired was true, indicating the // volume/pod attached/mounted but fsResizeRequired was true, indicating the
// given volume receives an resize request after attached/mounted. // given volume receives an resize request after attached/mounted.
type fsResizeRequiredError struct { type FsResizeRequiredError struct {
volumeName v1.UniqueVolumeName CurrentSize resource.Quantity
podName volumetypes.UniquePodName volumeName v1.UniqueVolumeName
podName volumetypes.UniquePodName
} }
func (err fsResizeRequiredError) Error() string { func (err FsResizeRequiredError) Error() string {
return fmt.Sprintf( return fmt.Sprintf(
"volumeName %q mounted to %q needs to resize file system", "volumeName %q mounted to %q needs to resize file system",
err.volumeName, err.podName) err.volumeName, err.podName)
} }
func newFsResizeRequiredError( func newFsResizeRequiredError(
volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) error { volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, currentSize resource.Quantity) error {
return fsResizeRequiredError{ return FsResizeRequiredError{
volumeName: volumeName, CurrentSize: currentSize,
podName: podName, volumeName: volumeName,
podName: podName,
} }
} }
// IsFSResizeRequiredError returns true if the specified error is a // IsFSResizeRequiredError returns true if the specified error is a
// fsResizeRequiredError. // fsResizeRequiredError.
func IsFSResizeRequiredError(err error) bool { func IsFSResizeRequiredError(err error) bool {
_, ok := err.(fsResizeRequiredError) _, ok := err.(FsResizeRequiredError)
return ok return ok
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache package cache
import ( import (
"k8s.io/apimachinery/pkg/api/resource"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -676,7 +677,7 @@ func TestUncertainVolumeMounts(t *testing.T) {
t.Fatalf("expected volume %s to be found in aws.GetPossiblyMountedVolumesForPod", volumeSpec1.Name()) t.Fatalf("expected volume %s to be found in aws.GetPossiblyMountedVolumesForPod", volumeSpec1.Name())
} }
volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1) volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1, resource.Quantity{})
if volExists { if volExists {
t.Fatalf("expected volume %s to not exist in asw", generatedVolumeName1) t.Fatalf("expected volume %s to not exist in asw", generatedVolumeName1)
} }
@ -762,7 +763,7 @@ func verifyPodExistsInVolumeAsw(
expectedDevicePath string, expectedDevicePath string,
asw ActualStateOfWorld) { asw ActualStateOfWorld) {
podExistsInVolume, devicePath, err := podExistsInVolume, devicePath, err :=
asw.PodExistsInVolume(expectedPodName, expectedVolumeName) asw.PodExistsInVolume(expectedPodName, expectedVolumeName, resource.Quantity{})
if err != nil { if err != nil {
t.Fatalf( t.Fatalf(
"ASW PodExistsInVolume failed. Expected: <no error> Actual: <%v>", err) "ASW PodExistsInVolume failed. Expected: <no error> Actual: <%v>", err)
@ -804,7 +805,7 @@ func verifyPodDoesntExistInVolumeAsw(
expectVolumeToExist bool, expectVolumeToExist bool,
asw ActualStateOfWorld) { asw ActualStateOfWorld) {
podExistsInVolume, devicePath, err := podExistsInVolume, devicePath, err :=
asw.PodExistsInVolume(podToCheck, volumeToCheck) asw.PodExistsInVolume(podToCheck, volumeToCheck, resource.Quantity{})
if !expectVolumeToExist && err == nil { if !expectVolumeToExist && err == nil {
t.Fatalf( t.Fatalf(
"ASW PodExistsInVolume did not return error. Expected: <error indicating volume does not exist> Actual: <%v>", err) "ASW PodExistsInVolume did not return error. Expected: <error indicating volume does not exist> Actual: <%v>", err)

View File

@ -124,6 +124,11 @@ type DesiredStateOfWorld interface {
// MarkVolumeAttachability updates the volume's attachability for a given volume // MarkVolumeAttachability updates the volume's attachability for a given volume
MarkVolumeAttachability(volumeName v1.UniqueVolumeName, attachable bool) MarkVolumeAttachability(volumeName v1.UniqueVolumeName, attachable bool)
// UpdatePersistentVolumeSize updates persistentVolumeSize in desired state of the world
// so as it can be compared against actual size and volume expansion performed
// if necessary
UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size *resource.Quantity)
} }
// VolumeToMount represents a volume that is attached to this node and needs to // VolumeToMount represents a volume that is attached to this node and needs to
@ -186,6 +191,10 @@ type volumeToMount struct {
// desiredSizeLimit indicates the desired upper bound on the size of the volume // desiredSizeLimit indicates the desired upper bound on the size of the volume
// (if so implemented) // (if so implemented)
desiredSizeLimit *resource.Quantity desiredSizeLimit *resource.Quantity
// persistentVolumeSize records desired size of a persistent volume.
// Usually this value reflects size recorded in pv.Spec.Capacity
persistentVolumeSize *resource.Quantity
} }
// The pod object represents a pod that references the underlying volume and // The pod object represents a pod that references the underlying volume and
@ -274,7 +283,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
} }
} }
} }
dsw.volumesToMount[volumeName] = volumeToMount{ vmt := volumeToMount{
volumeName: volumeName, volumeName: volumeName,
podsToMount: make(map[types.UniquePodName]podToMount), podsToMount: make(map[types.UniquePodName]podToMount),
pluginIsAttachable: attachable, pluginIsAttachable: attachable,
@ -283,6 +292,16 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
reportedInUse: false, reportedInUse: false,
desiredSizeLimit: sizeLimit, desiredSizeLimit: sizeLimit,
} }
// record desired size of the volume
if volumeSpec.PersistentVolume != nil {
pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage()
if pvCap != nil {
pvCapCopy := pvCap.DeepCopy()
vmt.persistentVolumeSize = &pvCapCopy
}
}
dsw.volumesToMount[volumeName] = vmt
} }
oldPodMount, ok := dsw.volumesToMount[volumeName].podsToMount[podName] oldPodMount, ok := dsw.volumesToMount[volumeName].podsToMount[podName]
mountRequestTime := time.Now() mountRequestTime := time.Now()
@ -347,6 +366,19 @@ func (dsw *desiredStateOfWorld) DeletePodFromVolume(
} }
} }
// UpdatePersistentVolumeSize updates last known PV size. This is used for volume expansion and
// should be only used for persistent volumes.
func (dsw *desiredStateOfWorld) UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size *resource.Quantity) {
dsw.Lock()
defer dsw.Unlock()
vol, volExists := dsw.volumesToMount[volumeName]
if volExists {
vol.persistentVolumeSize = size
dsw.volumesToMount[volumeName] = vol
}
}
func (dsw *desiredStateOfWorld) VolumeExists( func (dsw *desiredStateOfWorld) VolumeExists(
volumeName v1.UniqueVolumeName) bool { volumeName v1.UniqueVolumeName) bool {
dsw.RLock() dsw.RLock()
@ -403,21 +435,25 @@ func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount {
volumesToMount := make([]VolumeToMount, 0 /* len */, len(dsw.volumesToMount) /* cap */) volumesToMount := make([]VolumeToMount, 0 /* len */, len(dsw.volumesToMount) /* cap */)
for volumeName, volumeObj := range dsw.volumesToMount { for volumeName, volumeObj := range dsw.volumesToMount {
for podName, podObj := range volumeObj.podsToMount { for podName, podObj := range volumeObj.podsToMount {
volumesToMount = append( vmt := VolumeToMount{
volumesToMount, VolumeToMount: operationexecutor.VolumeToMount{
VolumeToMount{ VolumeName: volumeName,
VolumeToMount: operationexecutor.VolumeToMount{ PodName: podName,
VolumeName: volumeName, Pod: podObj.pod,
PodName: podName, VolumeSpec: podObj.volumeSpec,
Pod: podObj.pod, PluginIsAttachable: volumeObj.pluginIsAttachable,
VolumeSpec: podObj.volumeSpec, PluginIsDeviceMountable: volumeObj.pluginIsDeviceMountable,
PluginIsAttachable: volumeObj.pluginIsAttachable, OuterVolumeSpecName: podObj.outerVolumeSpecName,
PluginIsDeviceMountable: volumeObj.pluginIsDeviceMountable, VolumeGidValue: volumeObj.volumeGidValue,
OuterVolumeSpecName: podObj.outerVolumeSpecName, ReportedInUse: volumeObj.reportedInUse,
VolumeGidValue: volumeObj.volumeGidValue, MountRequestTime: podObj.mountRequestTime,
ReportedInUse: volumeObj.reportedInUse, DesiredSizeLimit: volumeObj.desiredSizeLimit,
MountRequestTime: podObj.mountRequestTime, },
DesiredSizeLimit: volumeObj.desiredSizeLimit}}) }
if volumeObj.persistentVolumeSize != nil {
vmt.PersistentVolumeSize = volumeObj.persistentVolumeSize.DeepCopy()
}
volumesToMount = append(volumesToMount, vmt)
} }
} }
return volumesToMount return volumesToMount

View File

@ -195,13 +195,12 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
mountedVolumes[mountedVolume.OuterVolumeSpecName] = mountedVolume mountedVolumes[mountedVolume.OuterVolumeSpecName] = mountedVolume
} }
processedVolumesForFSResize := sets.NewString()
for _, pod := range dswp.podManager.GetPods() { for _, pod := range dswp.podManager.GetPods() {
if dswp.podStateProvider.ShouldPodContainersBeTerminating(pod.UID) { if dswp.podStateProvider.ShouldPodContainersBeTerminating(pod.UID) {
// Do not (re)add volumes for pods that can't also be starting containers // Do not (re)add volumes for pods that can't also be starting containers
continue continue
} }
dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize) dswp.processPodVolumes(pod, mountedVolumesForPod)
} }
} }
@ -270,8 +269,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
// desired state of the world. // desired state of the world.
func (dswp *desiredStateOfWorldPopulator) processPodVolumes( func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
pod *v1.Pod, pod *v1.Pod,
mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume, mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume) {
processedVolumesForFSResize sets.String) {
if pod == nil { if pod == nil {
return return
} }
@ -314,9 +312,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
// sync reconstructed volume // sync reconstructed volume
dswp.actualStateOfWorld.SyncReconstructedVolume(uniqueVolumeName, uniquePodName, podVolume.Name) dswp.actualStateOfWorld.SyncReconstructedVolume(uniqueVolumeName, uniquePodName, podVolume.Name)
dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, uniquePodName, mountedVolumesForPod)
uniquePodName, mountedVolumesForPod, processedVolumesForFSResize)
} }
// some of the volume additions may have failed, should not mark this pod as fully processed // some of the volume additions may have failed, should not mark this pod as fully processed
@ -337,21 +333,16 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
} }
// checkVolumeFSResize checks whether a PVC mounted by the pod requires file // checkVolumeFSResize records desired PVC size for a volume mounted by the pod.
// system resize or not. If so, marks this volume as fsResizeRequired in ASW. // It is used for comparison with actual size(coming from pvc.Status.Capacity) and calling
// - mountedVolumesForPod stores all mounted volumes in ASW, because online // volume expansion on the node if needed.
// volume resize only considers mounted volumes.
// - processedVolumesForFSResize stores all volumes we have checked in current loop,
// because file system resize operation is a global operation for volume, so
// we only need to check it once if more than one pod use it.
func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize( func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize(
pod *v1.Pod, pod *v1.Pod,
podVolume v1.Volume, podVolume v1.Volume,
pvc *v1.PersistentVolumeClaim, pvc *v1.PersistentVolumeClaim,
volumeSpec *volume.Spec, volumeSpec *volume.Spec,
uniquePodName volumetypes.UniquePodName, uniquePodName volumetypes.UniquePodName,
mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume, mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume) {
processedVolumesForFSResize sets.String) {
if podVolume.PersistentVolumeClaim == nil { if podVolume.PersistentVolumeClaim == nil {
// Only PVC supports resize operation. // Only PVC supports resize operation.
return return
@ -363,11 +354,6 @@ func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize(
// or online resize in subsequent loop(after we confirm it has been mounted). // or online resize in subsequent loop(after we confirm it has been mounted).
return return
} }
if processedVolumesForFSResize.Has(string(uniqueVolumeName)) {
// File system resize operation is a global operation for volume,
// so we only need to check it once if more than one pod use it.
return
}
// volumeSpec.ReadOnly is the value that determines if volume could be formatted when being mounted. // volumeSpec.ReadOnly is the value that determines if volume could be formatted when being mounted.
// This is the same flag that determines filesystem resizing behaviour for offline resizing and hence // This is the same flag that determines filesystem resizing behaviour for offline resizing and hence
// we should use it here. This value comes from Pod.spec.volumes.persistentVolumeClaim.readOnly. // we should use it here. This value comes from Pod.spec.volumes.persistentVolumeClaim.readOnly.
@ -376,10 +362,12 @@ func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize(
klog.V(5).InfoS("Skip file system resize check for the volume, as the volume is mounted as readonly", "pod", klog.KObj(pod), "volumeName", podVolume.Name) klog.V(5).InfoS("Skip file system resize check for the volume, as the volume is mounted as readonly", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
return return
} }
if volumeRequiresFSResize(pvc, volumeSpec.PersistentVolume) { pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage()
dswp.actualStateOfWorld.MarkFSResizeRequired(uniqueVolumeName, uniquePodName) pvcStatusCap := pvc.Status.Capacity.Storage()
} dswp.desiredStateOfWorld.UpdatePersistentVolumeSize(uniqueVolumeName, pvCap)
processedVolumesForFSResize.Insert(string(uniqueVolumeName))
// in case the actualStateOfWorld was rebuild after kubelet restart ensure that claimSize is set to accurate value
dswp.actualStateOfWorld.InitializeClaimSize(uniqueVolumeName, pvcStatusCap)
} }
func getUniqueVolumeName( func getUniqueVolumeName(
@ -397,12 +385,6 @@ func getUniqueVolumeName(
return mountedVolume.VolumeName, true return mountedVolume.VolumeName, true
} }
func volumeRequiresFSResize(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
capacity := pvc.Status.Capacity[v1.ResourceStorage]
requested := pv.Spec.Capacity[v1.ResourceStorage]
return requested.Cmp(capacity) > 0
}
// podPreviouslyProcessed returns true if the volumes for this pod have already // podPreviouslyProcessed returns true if the volumes for this pod have already
// been processed/reprocessed by the populator. Otherwise, the volumes for this pod need to // been processed/reprocessed by the populator. Otherwise, the volumes for this pod need to
// be reprocessed. // be reprocessed.

View File

@ -977,7 +977,7 @@ func TestCheckVolumeFSResize(t *testing.T) {
}, },
verify: func(t *testing.T, vols []v1.UniqueVolumeName, volName v1.UniqueVolumeName) { verify: func(t *testing.T, vols []v1.UniqueVolumeName, volName v1.UniqueVolumeName) {
if len(vols) == 0 { if len(vols) == 0 {
t.Fatalf("Request resize for volume, but volume in ASW hasn't been marked as fsResizeRequired") t.Fatalf("Requested resize for volume, but volume in ASW hasn't been marked as fsResizeRequired")
} }
if len(vols) != 1 { if len(vols) != 1 {
t.Errorf("Some unexpected volumes are marked as fsResizeRequired: %v", vols) t.Errorf("Some unexpected volumes are marked as fsResizeRequired: %v", vols)
@ -1053,7 +1053,7 @@ func TestCheckVolumeFSResize(t *testing.T) {
func() { func() {
tc.resize(t, pv, pvc, dswp) tc.resize(t, pv, pvc, dswp)
resizeRequiredVolumes := reprocess(dswp, uniquePodName, fakeDSW, fakeASW) resizeRequiredVolumes := reprocess(dswp, uniquePodName, fakeDSW, fakeASW, *pv.Spec.Capacity.Storage())
tc.verify(t, resizeRequiredVolumes, uniqueVolumeName) tc.verify(t, resizeRequiredVolumes, uniqueVolumeName)
}() }()
@ -1099,16 +1099,16 @@ func clearASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t *te
} }
func reprocess(dswp *desiredStateOfWorldPopulator, uniquePodName types.UniquePodName, func reprocess(dswp *desiredStateOfWorldPopulator, uniquePodName types.UniquePodName,
dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName { dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName {
dswp.ReprocessPod(uniquePodName) dswp.ReprocessPod(uniquePodName)
dswp.findAndAddNewPods() dswp.findAndAddNewPods()
return getResizeRequiredVolumes(dsw, asw) return getResizeRequiredVolumes(dsw, asw, newSize)
} }
func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName { func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld, newSize resource.Quantity) []v1.UniqueVolumeName {
resizeRequiredVolumes := []v1.UniqueVolumeName{} resizeRequiredVolumes := []v1.UniqueVolumeName{}
for _, volumeToMount := range dsw.GetVolumesToMount() { for _, volumeToMount := range dsw.GetVolumesToMount() {
_, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) _, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, newSize)
if cache.IsFSResizeRequiredError(err) { if cache.IsFSResizeRequiredError(err) {
resizeRequiredVolumes = append(resizeRequiredVolumes, volumeToMount.VolumeName) resizeRequiredVolumes = append(resizeRequiredVolumes, volumeToMount.VolumeName)
} }
@ -1127,7 +1127,6 @@ func verifyVolumeExistsInVolumesToMount(t *testing.T, expectedVolumeName v1.Uniq
expectReportedInUse, expectReportedInUse,
volume.ReportedInUse) volume.ReportedInUse)
} }
return return
} }
} }

View File

@ -28,6 +28,8 @@ import (
"path/filepath" "path/filepath"
"time" "time"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/mount-utils" "k8s.io/mount-utils"
utilpath "k8s.io/utils/path" utilpath "k8s.io/utils/path"
@ -169,7 +171,7 @@ func (rc *reconciler) reconcile() {
// attach if kubelet is responsible for attaching volumes. // attach if kubelet is responsible for attaching volumes.
// If underlying PVC was resized while in-use then this function also handles volume // If underlying PVC was resized while in-use then this function also handles volume
// resizing. // resizing.
rc.mountAttachVolumes() rc.mountOrAttachVolumes()
// Ensure devices that should be detached/unmounted are detached/unmounted. // Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices() rc.unmountDetachDevices()
@ -193,82 +195,95 @@ func (rc *reconciler) unmountVolumes() {
} }
} }
func (rc *reconciler) mountAttachVolumes() { func (rc *reconciler) mountOrAttachVolumes() {
// Ensure volumes that should be attached/mounted are attached/mounted. // Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.PersistentVolumeSize)
volumeToMount.DevicePath = devicePath volumeToMount.DevicePath = devicePath
if cache.IsVolumeNotAttachedError(err) { if cache.IsVolumeNotAttachedError(err) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { rc.waitForVolumeAttach(volumeToMount)
//// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens
if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse {
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod))
continue
}
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume.
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount,
rc.nodeName,
rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
} else {
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
// so attach it
volumeToAttach := operationexecutor.VolumeToAttach{
VolumeName: volumeToMount.VolumeName,
VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.nodeName,
}
klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
}
} else if !volMounted || cache.IsRemountRequiredError(err) { } else if !volMounted || cache.IsRemountRequiredError(err) {
// Volume is not mounted, or is already mounted, but requires remounting rc.mountAttachedVolumes(volumeToMount, err)
remountingLogStr := ""
isRemount := cache.IsRemountRequiredError(err)
if isRemount {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld,
isRemount)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
if remountingLogStr == "" {
klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
} else {
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
}
}
} else if cache.IsFSResizeRequiredError(err) { } else if cache.IsFSResizeRequiredError(err) {
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)
err := rc.operationExecutor.ExpandInUseVolume( rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)
volumeToMount.VolumeToMount, }
rc.actualStateOfWorld) }
if err != nil && !isExpectedError(err) { }
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod))
} func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount, currentSize resource.Quantity) {
if err == nil { klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld, currentSize)
}
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
}
func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, podExistError error) {
// Volume is not mounted, or is already mounted, but requires remounting
remountingLogStr := ""
isRemount := cache.IsRemountRequiredError(podExistError)
if isRemount {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld,
isRemount)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
if remountingLogStr == "" {
klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
} else {
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
}
}
}
func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
//// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens
if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse {
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod))
return
}
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume.
klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount,
rc.nodeName,
rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
}
} else {
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
// so attach it
volumeToAttach := operationexecutor.VolumeToAttach{
VolumeName: volumeToMount.VolumeName,
VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.nodeName,
}
klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
}
if err == nil {
klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
} }
} }
} }

View File

@ -1161,57 +1161,9 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
pv := &v1.PersistentVolume{ pv := getTestPV(tc.pvName, tc.volumeMode, tc.oldPVSize)
ObjectMeta: metav1.ObjectMeta{ pvc := getTestPVC("pv", tc.volumeMode, tc.pvcSize, tc.pvcStatusSize)
Name: tc.pvName, pod := getTestPod(pvc.Name)
UID: "pvuid",
},
Spec: v1.PersistentVolumeSpec{
ClaimRef: &v1.ObjectReference{Name: "pvc"},
VolumeMode: tc.volumeMode,
Capacity: v1.ResourceList{
v1.ResourceStorage: tc.oldPVSize,
},
},
}
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc",
UID: "pvcuid",
},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: tc.pvcSize,
},
},
VolumeName: "pv",
VolumeMode: tc.volumeMode,
},
Status: v1.PersistentVolumeClaimStatus{
Capacity: v1.ResourceList{
v1.ResourceStorage: tc.pvcStatusSize,
},
},
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-name",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
},
},
},
},
},
}
// deep copy before reconciler runs to avoid data race. // deep copy before reconciler runs to avoid data race.
pvWithSize := pv.DeepCopy() pvWithSize := pv.DeepCopy()
@ -1284,10 +1236,12 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
pvWithSize.Spec.Capacity[v1.ResourceStorage] = tc.newPVSize pvWithSize.Spec.Capacity[v1.ResourceStorage] = tc.newPVSize
volumeSpec = &volume.Spec{PersistentVolume: pvWithSize} volumeSpec = &volume.Spec{PersistentVolume: pvWithSize}
dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
// mark volume as resize required
asw.MarkFSResizeRequired(volumeName, podName)
_, _, podExistErr := asw.PodExistsInVolume(podName, volumeName) t.Logf("Changing size of the volume to %s", tc.newPVSize.String())
newSize := tc.newPVSize.DeepCopy()
dsw.UpdatePersistentVolumeSize(volumeName, &newSize)
_, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, newSize)
if tc.expansionFailed { if tc.expansionFailed {
if cache.IsFSResizeRequiredError(podExistErr) { if cache.IsFSResizeRequiredError(podExistErr) {
t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr) t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr)
@ -1299,7 +1253,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
go reconciler.Run(wait.NeverStop) go reconciler.Run(wait.NeverStop)
waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) { waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
mounted, _, err := asw.PodExistsInVolume(podName, volumeName) mounted, _, err := asw.PodExistsInVolume(podName, volumeName, newSize)
return mounted && err == nil, nil return mounted && err == nil, nil
}) })
if waitErr != nil { if waitErr != nil {
@ -1311,6 +1265,69 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
} }
} }
func getTestPVC(pvName string, volumeMode *v1.PersistentVolumeMode, specSize, statusSize resource.Quantity) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc",
UID: "pvcuid",
},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: specSize,
},
},
VolumeName: pvName,
VolumeMode: volumeMode,
},
Status: v1.PersistentVolumeClaimStatus{
Capacity: v1.ResourceList{
v1.ResourceStorage: statusSize,
},
},
}
return pvc
}
func getTestPV(pvName string, volumeMode *v1.PersistentVolumeMode, pvSize resource.Quantity) *v1.PersistentVolume {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: pvName,
UID: "pvuid",
},
Spec: v1.PersistentVolumeSpec{
ClaimRef: &v1.ObjectReference{Name: "pvc"},
VolumeMode: volumeMode,
Capacity: v1.ResourceList{
v1.ResourceStorage: pvSize,
},
},
}
return pv
}
func getTestPod(claimName string) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-name",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: claimName,
},
},
},
},
},
}
return pod
}
func Test_UncertainDeviceGlobalMounts(t *testing.T) { func Test_UncertainDeviceGlobalMounts(t *testing.T) {
var tests = []struct { var tests = []struct {
name string name string
@ -1791,7 +1808,7 @@ func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, podN
err := retryWithExponentialBackOff( err := retryWithExponentialBackOff(
testOperationBackOffDuration, testOperationBackOffDuration,
func() (bool, error) { func() (bool, error) {
mounted, _, err := asw.PodExistsInVolume(podName, volumeName) mounted, _, err := asw.PodExistsInVolume(podName, volumeName, resource.Quantity{})
if mounted || err != nil { if mounted || err != nil {
return false, nil return false, nil
} }

View File

@ -495,7 +495,6 @@ func (spec *Spec) IsKubeletExpandable() bool {
return spec.PersistentVolume.Spec.FlexVolume != nil return spec.PersistentVolume.Spec.FlexVolume != nil
default: default:
return false return false
} }
} }

View File

@ -175,6 +175,7 @@ type FakeVolumePlugin struct {
LastProvisionerOptions VolumeOptions LastProvisionerOptions VolumeOptions
NewAttacherCallCount int NewAttacherCallCount int
NewDetacherCallCount int NewDetacherCallCount int
NodeExpandCallCount int
VolumeLimits map[string]int64 VolumeLimits map[string]int64
VolumeLimitsError error VolumeLimitsError error
LimitKey string LimitKey string
@ -471,6 +472,7 @@ func (plugin *FakeVolumePlugin) RequiresFSResize() bool {
} }
func (plugin *FakeVolumePlugin) NodeExpand(resizeOptions NodeResizeOptions) (bool, error) { func (plugin *FakeVolumePlugin) NodeExpand(resizeOptions NodeResizeOptions) (bool, error) {
plugin.NodeExpandCallCount++
if resizeOptions.VolumeSpec.Name() == FailWithInUseVolumeName { if resizeOptions.VolumeSpec.Name() == FailWithInUseVolumeName {
return false, volumetypes.NewFailedPreconditionError("volume-in-use") return false, volumetypes.NewFailedPreconditionError("volume-in-use")
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package operationexecutor package operationexecutor
import ( import (
"k8s.io/apimachinery/pkg/api/resource"
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -108,7 +109,7 @@ func (f *fakeOGCounter) GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeC
return f.recordFuncCall("GenerateExpandVolumeFunc"), nil return f.recordFuncCall("GenerateExpandVolumeFunc"), nil
} }
func (f *fakeOGCounter) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { func (f *fakeOGCounter) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateExpandInUseVolumeFunc"), nil return f.recordFuncCall("GenerateExpandInUseVolumeFunc"), nil
} }

View File

@ -0,0 +1,151 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package operationexecutor
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
kevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
type NodeExpander struct {
nodeResizeOperationOpts
kubeClient clientset.Interface
recorder record.EventRecorder
// computed via precheck
pvcStatusCap resource.Quantity
pvCap resource.Quantity
resizeStatus *v1.PersistentVolumeClaimResizeStatus
// pvcAlreadyUpdated if true indicates that although we are calling NodeExpandVolume on the kubelet
// PVC has already been updated - possibly because expansion already succeeded on different node.
// This can happen when a RWX PVC is expanded.
pvcAlreadyUpdated bool
}
func newNodeExpander(resizeOp nodeResizeOperationOpts, client clientset.Interface, recorder record.EventRecorder) *NodeExpander {
return &NodeExpander{
kubeClient: client,
nodeResizeOperationOpts: resizeOp,
recorder: recorder,
}
}
// testResponseData is merely used for doing sanity checks in unit tests
type testResponseData struct {
// indicates that resize operation was called on underlying volume driver
// mainly useful for testing.
resizeCalledOnPlugin bool
// Indicates whether kubelet should assume resize operation as finished.
// For kubelet - resize operation could be assumed as finished even if
// actual resizing is *not* finished. This can happen, because certain prechecks
// are failing and kubelet should not retry expansion, or it could happen
// because resize operation is genuinely finished.
assumeResizeFinished bool
}
// runPreCheck performs some sanity checks before expansion can be performed on the PVC.
func (ne *NodeExpander) runPreCheck() bool {
ne.pvcStatusCap = ne.pvc.Status.Capacity[v1.ResourceStorage]
ne.pvCap = ne.pv.Spec.Capacity[v1.ResourceStorage]
ne.resizeStatus = ne.pvc.Status.ResizeStatus
// PVC is already expanded but we are still trying to expand the volume because
// last recorded size in ASOW is older. This can happen for RWX volume types.
if ne.pvcStatusCap.Cmp(ne.pluginResizeOpts.NewSize) >= 0 && (ne.resizeStatus == nil || *ne.resizeStatus == v1.PersistentVolumeClaimNoExpansionInProgress) {
ne.pvcAlreadyUpdated = true
}
// if resizestatus is nil or NodeExpansionInProgress or NodeExpansionPending then we
// should allow volume expansion on the node to proceed. We are making an exception for
// resizeStatus being nil because it will support use cases where
// resizeStatus may not be set (old control-plane expansion controller etc).
if ne.resizeStatus == nil ||
ne.pvcAlreadyUpdated ||
*ne.resizeStatus == v1.PersistentVolumeClaimNodeExpansionPending ||
*ne.resizeStatus == v1.PersistentVolumeClaimNodeExpansionInProgress {
return true
}
return false
}
func (ne *NodeExpander) expandOnPlugin() (bool, error, testResponseData) {
allowExpansion := ne.runPreCheck()
if !allowExpansion {
return false, nil, testResponseData{false, true}
}
var err error
nodeName := ne.vmt.Pod.Spec.NodeName
if !ne.pvcAlreadyUpdated {
ne.pvc, err = util.MarkNodeExpansionInProgress(ne.pvc, ne.kubeClient)
if err != nil {
msg := ne.vmt.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err)
klog.Errorf(msg.Error())
return false, err, testResponseData{}
}
}
_, resizeErr := ne.volumePlugin.NodeExpand(ne.pluginResizeOpts)
if resizeErr != nil {
if volumetypes.IsOperationFinishedError(resizeErr) {
var markFailedError error
ne.pvc, markFailedError = util.MarkNodeExpansionFailed(ne.pvc, ne.kubeClient)
if markFailedError != nil {
klog.Errorf(ne.vmt.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error())
}
}
// if driver returned FailedPrecondition error that means
// volume expansion should not be retried on this node but
// expansion operation should not block mounting
if volumetypes.IsFailedPreconditionError(resizeErr) {
ne.actualStateOfWorld.MarkForInUseExpansionError(ne.vmt.VolumeName)
klog.Errorf(ne.vmt.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error())
return false, nil, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true}
}
return false, resizeErr, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true}
}
simpleMsg, detailedMsg := ne.vmt.GenerateMsg("MountVolume.NodeExpandVolume succeeded", nodeName)
ne.recorder.Eventf(ne.vmt.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
ne.recorder.Eventf(ne.pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.InfoS(detailedMsg, "pod", klog.KObj(ne.vmt.Pod))
// no need to update PVC object if we already updated it
if ne.pvcAlreadyUpdated {
return true, nil, testResponseData{true, true}
}
// File system resize succeeded, now update the PVC's Capacity to match the PV's
ne.pvc, err = util.MarkFSResizeFinished(ne.pvc, ne.pluginResizeOpts.NewSize, ne.kubeClient)
if err != nil {
return true, fmt.Errorf("mountVolume.NodeExpandVolume update pvc status failed: %v", err), testResponseData{true, true}
}
return true, nil, testResponseData{true, true}
}

View File

@ -0,0 +1,154 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package operationexecutor
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"testing"
)
func TestNodeExpander(t *testing.T) {
var tests = []struct {
name string
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
// desired size, defaults to pv.Spec.Capacity
desiredSize *resource.Quantity
// actualSize, defaults to pvc.Status.Capacity
actualSize *resource.Quantity
// expectations of test
expectedResizeStatus v1.PersistentVolumeClaimResizeStatus
expectedStatusSize resource.Quantity
expectResizeCall bool
assumeResizeOpAsFinished bool
expectError bool
}{
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_failed",
pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNodeExpansionFailed),
pv: getTestPV("test-vol0", "2G"),
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed,
expectResizeCall: false,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("1G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending",
pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending),
pv: getTestPV("test-vol0", "2G"),
expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress,
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("2G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing",
pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending),
pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"),
expectError: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed,
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("1G"),
},
{
name: "pv.spec.cap = pvc.status.cap, resizeStatus='', desiredSize > actualSize",
pvc: getTestPVC("test-vol0", "2G", "2G", "2G", v1.PersistentVolumeClaimNoExpansionInProgress),
pv: getTestPV("test-vol0", "2G"),
expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress,
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("2G"),
},
}
for i := range tests {
test := tests[i]
t.Run(test.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, true)()
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
pvc := test.pvc
pv := test.pv
pod := getTestPod("test-pod", pvc.Name)
og := getTestOperationGenerator(volumePluginMgr, pvc, pv)
vmt := VolumeToMount{
Pod: pod,
VolumeName: v1.UniqueVolumeName(pv.Name),
VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false),
}
desiredSize := test.desiredSize
if desiredSize == nil {
desiredSize = pv.Spec.Capacity.Storage()
}
actualSize := test.actualSize
if actualSize == nil {
actualSize = pvc.Status.Capacity.Storage()
}
resizeOp := nodeResizeOperationOpts{
pvc: pvc,
pv: pv,
volumePlugin: fakePlugin,
vmt: vmt,
actualStateOfWorld: nil,
pluginResizeOpts: volume.NodeResizeOptions{
VolumeSpec: vmt.VolumeSpec,
NewSize: *desiredSize,
OldSize: *actualSize,
},
}
ogInstance, _ := og.(*operationGenerator)
nodeExpander := newNodeExpander(resizeOp, ogInstance.kubeClient, ogInstance.recorder)
_, err, expansionResponse := nodeExpander.expandOnPlugin()
pvc = nodeExpander.pvc
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
if !test.expectError && err != nil {
t.Errorf("For test %s, expected no error got: %v", test.name, err)
}
if test.expectError && err == nil {
t.Errorf("For test %s, expected error but got none", test.name)
}
if test.expectResizeCall != expansionResponse.resizeCalledOnPlugin {
t.Errorf("For test %s, expected resize called %t, got %t", test.name, test.expectResizeCall, expansionResponse.resizeCalledOnPlugin)
}
if test.assumeResizeOpAsFinished != expansionResponse.assumeResizeFinished {
t.Errorf("For test %s, expected assumeResizeOpAsFinished %t, got %t", test.name, test.assumeResizeOpAsFinished, expansionResponse.assumeResizeFinished)
}
if test.expectedResizeStatus != *pvc.Status.ResizeStatus {
t.Errorf("For test %s, expected resizeStatus %v, got %v", test.name, test.expectedResizeStatus, *pvc.Status.ResizeStatus)
}
if pvcStatusCap.Cmp(test.expectedStatusSize) != 0 {
t.Errorf("For test %s, expected status size %s, got %s", test.name, test.expectedStatusSize.String(), pvcStatusCap.String())
}
})
}
}

View File

@ -148,7 +148,7 @@ type OperationExecutor interface {
// and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true // and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true
IsOperationSafeToRetry(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName, operationName string) bool IsOperationSafeToRetry(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName, operationName string) bool
// ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume. // ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume.
ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) error
// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, volumePath string, pluginName string) (*volume.Spec, error) ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, volumePath string, pluginName string) (*volume.Spec, error)
// CheckVolumeExistenceOperation checks volume existence // CheckVolumeExistenceOperation checks volume existence
@ -201,7 +201,7 @@ type ActualStateOfWorldMounterUpdater interface {
MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
// Marks the specified volume's file system resize request is finished. // Marks the specified volume's file system resize request is finished.
MarkVolumeAsResized(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool
// GetDeviceMountState returns mount state of the device in global path // GetDeviceMountState returns mount state of the device in global path
GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState
@ -245,6 +245,11 @@ type ActualStateOfWorldAttacherUpdater interface {
// Unmarks the desire to detach for the specified volume (add the volume back to // Unmarks the desire to detach for the specified volume (add the volume back to
// the node's volumesToReportAsAttached list) // the node's volumesToReportAsAttached list)
AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
// InitializeClaimSize sets pvc claim size by reading pvc.Status.Capacity
InitializeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity)
GetClaimSize(volumeName v1.UniqueVolumeName) *resource.Quantity
} }
// VolumeLogger defines a set of operations for generating volume-related logging and error msgs // VolumeLogger defines a set of operations for generating volume-related logging and error msgs
@ -420,6 +425,10 @@ type VolumeToMount struct {
// time at which volume was requested to be mounted // time at which volume was requested to be mounted
MountRequestTime time.Time MountRequestTime time.Time
// PersistentVolumeSize stores desired size of the volume.
// usually this is the size if pv.Spec.Capacity
PersistentVolumeSize resource.Quantity
} }
// DeviceMountState represents device mount state in a global path. // DeviceMountState represents device mount state in a global path.
@ -994,8 +1003,8 @@ func (oe *operationExecutor) UnmountDevice(
deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations) deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations)
} }
func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) error {
generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld) generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld, currentSize)
if err != nil { if err != nil {
return err return err
} }

View File

@ -18,6 +18,7 @@ package operationexecutor
import ( import (
"fmt" "fmt"
"k8s.io/apimachinery/pkg/api/resource"
"strconv" "strconv"
"testing" "testing"
"time" "time"
@ -668,7 +669,7 @@ func (fopg *fakeOperationGenerator) GenerateExpandAndRecoverVolumeFunc(pvc *v1.P
}, nil }, nil
} }
func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) {
opFunc := func() volumetypes.OperationContext { opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit) startOperationAndBlock(fopg.ch, fopg.quit)
return volumetypes.NewOperationContext(nil, nil, false) return volumetypes.NewOperationContext(nil, nil, false)

View File

@ -87,6 +87,16 @@ type operationGenerator struct {
translator InTreeToCSITranslator translator InTreeToCSITranslator
} }
type inTreeResizeResponse struct {
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
err error
// indicates that resize operation was called on underlying volume driver
// mainly useful for testing.
resizeCalled bool
}
// NewOperationGenerator is returns instance of operationGenerator // NewOperationGenerator is returns instance of operationGenerator
func NewOperationGenerator(kubeClient clientset.Interface, func NewOperationGenerator(kubeClient clientset.Interface,
volumePluginMgr *volume.VolumePluginMgr, volumePluginMgr *volume.VolumePluginMgr,
@ -150,7 +160,9 @@ type OperationGenerator interface {
GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume, string) (volumetypes.GeneratedOperations, error) GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume, string) (volumetypes.GeneratedOperations, error)
// Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume. // Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume.
GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) // Along with volumeToMount and actualStateOfWorld, the function expects current size of volume on the node as an argument. The current
// size here always refers to capacity last recorded in actualStateOfWorld from pvc.Status.Capacity
GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error)
} }
type inTreeResizeOpts struct { type inTreeResizeOpts struct {
@ -161,27 +173,6 @@ type inTreeResizeOpts struct {
volumePlugin volume.ExpandableVolumePlugin volumePlugin volume.ExpandableVolumePlugin
} }
type inTreeResizeResponse struct {
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
err error
// Indicates whether kubelet should assume resize operation as finished.
// For kubelet - resize operation could be assumed as finished even if
// actual resizing is *not* finished. This can happen, because certain prechecks
// are failing and kubelet should not retry expansion, or it could happen
// because resize operation is genuinely finished.
assumeResizeOpAsFinished bool
// indicates that resize operation was called on underlying volume driver
// mainly useful for testing.
resizeCalled bool
// indicates whether entire volume expansion is finished or not
// only used from nodeExpansion calls. Mainly used for testing.
resizeFinished bool
}
type nodeResizeOperationOpts struct { type nodeResizeOperationOpts struct {
vmt VolumeToMount vmt VolumeToMount
pvc *v1.PersistentVolumeClaim pvc *v1.PersistentVolumeClaim
@ -712,7 +703,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
klog.V(verbosity).InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod)) klog.V(verbosity).InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
resizeOptions.DeviceMountPath = volumeMounter.GetPath() resizeOptions.DeviceMountPath = volumeMounter.GetPath()
_, resizeError = og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) _, resizeError = og.expandVolumeDuringMount(volumeToMount, actualStateOfWorld, resizeOptions)
if resizeError != nil { if resizeError != nil {
klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError) klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError)
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError) eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError)
@ -1205,7 +1196,7 @@ func (og *operationGenerator) GenerateMapVolumeFunc(
DevicePath: devicePath, DevicePath: devicePath,
DeviceStagePath: stagingPath, DeviceStagePath: stagingPath,
} }
_, resizeError := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions) _, resizeError := og.expandVolumeDuringMount(volumeToMount, actualStateOfWorld, resizeOptions)
if resizeError != nil { if resizeError != nil {
klog.Errorf("MapVolume.NodeExpandVolume failed with %v", resizeError) klog.Errorf("MapVolume.NodeExpandVolume failed with %v", resizeError)
eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError) eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError)
@ -1491,6 +1482,22 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext { verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec) migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
claimSize := actualStateOfWorld.GetClaimSize(volumeToMount.VolumeName)
// only fetch claimSize if it was not set previously
if volumeToMount.VolumeSpec.PersistentVolume != nil && claimSize == nil {
pv := volumeToMount.VolumeSpec.PersistentVolume
pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil {
eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume fetching pvc failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
pvcStatusSize := pvc.Status.Capacity.Storage()
if pvcStatusSize != nil {
claimSize = pvcStatusSize
}
}
if !volumeToMount.PluginIsAttachable { if !volumeToMount.PluginIsAttachable {
// If the volume does not implement the attacher interface, it is // If the volume does not implement the attacher interface, it is
// assumed to be attached and the actual state of the world is // assumed to be attached and the actual state of the world is
@ -1503,7 +1510,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr) eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
} }
actualStateOfWorld.InitializeClaimSize(volumeToMount.VolumeName, claimSize)
return volumetypes.NewOperationContext(nil, nil, migrated) return volumetypes.NewOperationContext(nil, nil, migrated)
} }
@ -1544,6 +1551,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
} }
actualStateOfWorld.InitializeClaimSize(volumeToMount.VolumeName, claimSize)
return volumetypes.NewOperationContext(nil, nil, migrated) return volumetypes.NewOperationContext(nil, nil, migrated)
} }
} }
@ -1894,7 +1902,7 @@ func (og *operationGenerator) markForPendingNodeExpansion(pvc *v1.PersistentVolu
func (og *operationGenerator) GenerateExpandInUseVolumeFunc( func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
volumeToMount VolumeToMount, volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) {
volumePlugin, err := volumePlugin, err :=
og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
@ -1907,9 +1915,17 @@ func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
var eventErr, detailedErr error var eventErr, detailedErr error
migrated := false migrated := false
if currentSize.IsZero() || volumeToMount.PersistentVolumeSize.IsZero() {
err := fmt.Errorf("current or new size of the volume is not set")
eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandvolume.expansion failed", err)
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
}
resizeOptions := volume.NodeResizeOptions{ resizeOptions := volume.NodeResizeOptions{
VolumeSpec: volumeToMount.VolumeSpec, VolumeSpec: volumeToMount.VolumeSpec,
DevicePath: volumeToMount.DevicePath, DevicePath: volumeToMount.DevicePath,
OldSize: currentSize,
NewSize: volumeToMount.PersistentVolumeSize,
} }
fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec) fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
if err != nil { if err != nil {
@ -2011,10 +2027,11 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
return false, e1, e2 return false, e1, e2
} }
if resizeDone { if resizeDone {
markFSResizedErr := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.PodName, volumeToMount.VolumeName) markingDone := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.VolumeName, &resizeOptions.NewSize)
if markFSResizedErr != nil { if !markingDone {
// On failure, return error. Caller will log and retry. // On failure, return error. Caller will log and retry.
e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.MarkVolumeAsResized failed", markFSResizedErr) genericFailureError := fmt.Errorf("unable to mark volume as resized")
e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.MarkVolumeAsResized failed", genericFailureError)
return false, e1, e2 return false, e1, e2
} }
return true, nil, nil return true, nil, nil
@ -2022,25 +2039,9 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
return false, nil, nil return false, nil, nil
} }
func (og *operationGenerator) nodeExpandVolume( func (og *operationGenerator) expandVolumeDuringMount(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, rsOpts volume.NodeResizeOptions) (bool, error) {
volumeToMount VolumeToMount, supportsExpansion, expandablePlugin := og.checkIfSupportsNodeExpansion(volumeToMount)
actualStateOfWorld ActualStateOfWorldMounterUpdater, if supportsExpansion {
rsOpts volume.NodeResizeOptions) (bool, error) {
if volumeToMount.VolumeSpec != nil &&
volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration {
klog.V(4).Infof("This volume %s is a migrated inline volume and is not resizable", volumeToMount.VolumeName)
return true, nil
}
// Get expander, if possible
expandableVolumePlugin, _ :=
og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec)
if expandableVolumePlugin != nil &&
expandableVolumePlugin.RequiresFSResize() &&
volumeToMount.VolumeSpec.PersistentVolume != nil {
pv := volumeToMount.VolumeSpec.PersistentVolume pv := volumeToMount.VolumeSpec.PersistentVolume
pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{}) pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil { if err != nil {
@ -2055,193 +2056,146 @@ func (og *operationGenerator) nodeExpandVolume(
og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg) og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
return true, nil return true, nil
} }
resizeOp := nodeResizeOperationOpts{ pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
vmt: volumeToMount, pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
pvc: pvc, if pvcStatusCap.Cmp(pvSpecCap) < 0 {
pv: pv, rsOpts.NewSize = pvSpecCap
pluginResizeOpts: rsOpts, rsOpts.OldSize = pvcStatusCap
volumePlugin: expandableVolumePlugin, resizeOp := nodeResizeOperationOpts{
actualStateOfWorld: actualStateOfWorld, vmt: volumeToMount,
} pvc: pvc,
if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) { pv: pv,
resizeResponse := og.callNodeExpandOnPlugin(resizeOp) pluginResizeOpts: rsOpts,
return resizeResponse.assumeResizeOpAsFinished, resizeResponse.err volumePlugin: expandablePlugin,
} else { actualStateOfWorld: actualStateOfWorld,
return og.legacyCallNodeExpandOnPlugin(resizeOp) }
if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder)
resizeFinished, err, _ := nodeExpander.expandOnPlugin()
return resizeFinished, err
} else {
return og.legacyCallNodeExpandOnPlugin(resizeOp)
}
} }
} }
return true, nil return true, nil
} }
// callNodeExpandOnPlugin is newer version of calling node expansion on plugins, which does support func (og *operationGenerator) checkIfSupportsNodeExpansion(volumeToMount VolumeToMount) (bool, volume.NodeExpandableVolumePlugin) {
// recovery from volume expansion failure. if volumeToMount.VolumeSpec != nil &&
func (og *operationGenerator) callNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) inTreeResizeResponse { volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration {
pvc := resizeOp.pvc klog.V(4).Infof("This volume %s is a migrated inline volume and is not resizable", volumeToMount.VolumeName)
pv := resizeOp.pv return false, nil
volumeToMount := resizeOp.vmt
rsOpts := resizeOp.pluginResizeOpts
actualStateOfWorld := resizeOp.actualStateOfWorld
expandableVolumePlugin := resizeOp.volumePlugin
var err error
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
resizeResponse := inTreeResizeResponse{
pvc: pvc,
pv: pv,
} }
if permitNodeExpansion(pvc, pv) { // Get expander, if possible
// File system resize was requested, proceed expandableVolumePlugin, _ :=
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod)) og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec)
if expandableVolumePlugin != nil &&
rsOpts.VolumeSpec = volumeToMount.VolumeSpec expandableVolumePlugin.RequiresFSResize() &&
rsOpts.NewSize = pvSpecCap volumeToMount.VolumeSpec.PersistentVolume != nil {
rsOpts.OldSize = pvcStatusCap return true, expandableVolumePlugin
pvc, err = util.MarkNodeExpansionInProgress(pvc, og.kubeClient)
if err != nil {
msg := volumeToMount.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err)
klog.Errorf(msg.Error())
resizeResponse.err = msg
return resizeResponse
}
resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
resizeResponse.resizeCalled = true
if resizeErr != nil {
if volumetypes.IsOperationFinishedError(resizeErr) {
var markFailedError error
pvc, markFailedError = util.MarkNodeExpansionFailed(pvc, og.kubeClient)
// update the pvc with node expansion object
resizeResponse.pvc = pvc
resizeResponse.assumeResizeOpAsFinished = true
if markFailedError != nil {
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error())
}
}
// if driver returned FailedPrecondition error that means
// volume expansion should not be retried on this node but
// expansion operation should not block mounting
if volumetypes.IsFailedPreconditionError(resizeErr) {
actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName)
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error())
resizeResponse.assumeResizeOpAsFinished = true
return resizeResponse
}
resizeResponse.err = resizeErr
return resizeResponse
}
resizeResponse.resizeFinished = resizeDone
// Volume resizing is not done but it did not error out. This could happen if a CSI volume
// does not have node stage_unstage capability but was asked to resize the volume before
// node publish. In which case - we must retry resizing after node publish.
if !resizeDone {
return resizeResponse
}
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "")
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
// File system resize succeeded, now update the PVC's Capacity to match the PV's
pvc, err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient)
resizeResponse.pvc = pvc
if err != nil {
resizeResponse.err = fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err)
// On retry, NodeExpandVolume will be called again but do nothing
return resizeResponse
}
resizeResponse.assumeResizeOpAsFinished = true
return resizeResponse
} }
// somehow a resize operation was queued, but we can not perform any resizing because return false, nil
// prechecks required for node expansion failed. Kubelet should not retry expanding the volume. }
resizeResponse.assumeResizeOpAsFinished = true
return resizeResponse func (og *operationGenerator) nodeExpandVolume(
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
rsOpts volume.NodeResizeOptions) (bool, error) {
supportsExpansion, expandableVolumePlugin := og.checkIfSupportsNodeExpansion(volumeToMount)
if supportsExpansion {
// lets use sizes handed over to us by caller for comparison
if rsOpts.NewSize.Cmp(rsOpts.OldSize) > 0 {
pv := volumeToMount.VolumeSpec.PersistentVolume
pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil {
// Return error rather than leave the file system un-resized, caller will log and retry
return false, fmt.Errorf("mountVolume.NodeExpandVolume get PVC failed : %v", err)
}
if volumeToMount.VolumeSpec.ReadOnly {
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system")
klog.Warningf(detailedMsg)
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
return true, nil
}
resizeOp := nodeResizeOperationOpts{
vmt: volumeToMount,
pvc: pvc,
pv: pv,
pluginResizeOpts: rsOpts,
volumePlugin: expandableVolumePlugin,
actualStateOfWorld: actualStateOfWorld,
}
if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder)
resizeFinished, err, _ := nodeExpander.expandOnPlugin()
return resizeFinished, err
} else {
return og.legacyCallNodeExpandOnPlugin(resizeOp)
}
}
}
return true, nil
} }
// legacyCallNodeExpandOnPlugin is old version of calling node expansion on plugin, which does not support // legacyCallNodeExpandOnPlugin is old version of calling node expansion on plugin, which does not support
// recovery from volume expansion failure // recovery from volume expansion failure
// TODO: Removing this code when RecoverVolumeExpansionFailure feature goes GA.
func (og *operationGenerator) legacyCallNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) (bool, error) { func (og *operationGenerator) legacyCallNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) (bool, error) {
pvc := resizeOp.pvc pvc := resizeOp.pvc
pv := resizeOp.pv
volumeToMount := resizeOp.vmt volumeToMount := resizeOp.vmt
rsOpts := resizeOp.pluginResizeOpts rsOpts := resizeOp.pluginResizeOpts
actualStateOfWorld := resizeOp.actualStateOfWorld actualStateOfWorld := resizeOp.actualStateOfWorld
expandableVolumePlugin := resizeOp.volumePlugin expandableVolumePlugin := resizeOp.volumePlugin
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
nodeName := volumeToMount.Pod.Spec.NodeName
var err error var err error
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] // File system resize was requested, proceed
pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage] klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
if pvcStatusCap.Cmp(pvSpecCap) < 0 {
// File system resize was requested, proceed
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
rsOpts.VolumeSpec = volumeToMount.VolumeSpec rsOpts.VolumeSpec = volumeToMount.VolumeSpec
rsOpts.NewSize = pvSpecCap
rsOpts.OldSize = pvcStatusCap _, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts) if resizeErr != nil {
if resizeErr != nil { // if driver returned FailedPrecondition error that means
// if driver returned FailedPrecondition error that means // volume expansion should not be retried on this node but
// volume expansion should not be retried on this node but // expansion operation should not block mounting
// expansion operation should not block mounting if volumetypes.IsFailedPreconditionError(resizeErr) {
if volumetypes.IsFailedPreconditionError(resizeErr) { actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName)
actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName) klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed", resizeErr).Error())
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error()) return true, nil
return true, nil
}
return false, resizeErr
}
// Volume resizing is not done but it did not error out. This could happen if a CSI volume
// does not have node stage_unstage capability but was asked to resize the volume before
// node publish. In which case - we must retry resizing after node publish.
if !resizeDone {
return false, nil
}
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "")
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
// File system resize succeeded, now update the PVC's Capacity to match the PV's
_, err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient)
if err != nil {
// On retry, NodeExpandVolume will be called again but do nothing
return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err)
} }
return false, resizeErr
}
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", nodeName)
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
// if PVC already has new size, there is no need to update it.
if pvcStatusCap.Cmp(rsOpts.NewSize) >= 0 {
return true, nil return true, nil
} }
// File system resize succeeded, now update the PVC's Capacity to match the PV's
_, err = util.MarkFSResizeFinished(pvc, rsOpts.NewSize, og.kubeClient)
if err != nil {
// On retry, NodeExpandVolume will be called again but do nothing
return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err)
}
return true, nil return true, nil
} }
func permitNodeExpansion(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
// if pvc.Status.Cap is >= pv.Spec.Cap then volume is already expanded
if pvcStatusCap.Cmp(pvSpecCap) >= 0 {
return false
}
resizeStatus := pvc.Status.ResizeStatus
// if resizestatus is nil or NodeExpansionInProgress or NodeExpansionPending then we should allow volume expansion on
// the node to proceed. We are making an exception for resizeStatus being nil because it will support use cases where
// resizeStatus may not be set (old control-plane expansion controller etc).
if resizeStatus == nil || *resizeStatus == v1.PersistentVolumeClaimNodeExpansionPending || *resizeStatus == v1.PersistentVolumeClaimNodeExpansionInProgress {
return true
} else {
klog.Infof("volume %s/%s can not be expanded because resizeStaus is: %s", pvc.Namespace, pvc.Name, *resizeStatus)
return false
}
}
func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error { func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error {
mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec) mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec)

View File

@ -17,9 +17,11 @@ limitations under the License.
package operationexecutor package operationexecutor
import ( import (
"fmt"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
@ -209,101 +211,130 @@ func TestOperationGenerator_GenerateExpandAndRecoverVolumeFunc(t *testing.T) {
} }
} }
func TestOperationGenerator_callNodeExpansionOnPlugin(t *testing.T) { func TestOperationGenerator_nodeExpandVolume(t *testing.T) {
var tests = []struct { getSizeFunc := func(size string) *resource.Quantity {
name string x := resource.MustParse(size)
pvc *v1.PersistentVolumeClaim return &x
pv *v1.PersistentVolume
recoverFeatureGate bool
// expectations of test
expectedResizeStatus v1.PersistentVolumeClaimResizeStatus
expectedStatusSize resource.Quantity
expectResizeCall bool
assumeResizeOpAsFinished bool
expectError bool
}{
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_failed",
pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNodeExpansionFailed),
pv: getTestPV("test-vol0", "2G"),
recoverFeatureGate: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed,
expectResizeCall: false,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("1G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending",
pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending),
pv: getTestPV("test-vol0", "2G"),
recoverFeatureGate: true,
expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress,
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("2G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing",
pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending),
pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"),
recoverFeatureGate: true,
expectError: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed,
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("1G"),
},
} }
var tests = []struct {
name string
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
// desired size, defaults to pv.Spec.Capacity
desiredSize *resource.Quantity
// actualSize, defaults to pvc.Status.Capacity
actualSize *resource.Quantity
// expectations of test
expectedResizeStatus v1.PersistentVolumeClaimResizeStatus
expectedStatusSize resource.Quantity
resizeCallCount int
expectError bool
}{
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_failed",
pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNodeExpansionFailed),
pv: getTestPV("test-vol0", "2G"),
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed,
resizeCallCount: 0,
expectedStatusSize: resource.MustParse("1G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending",
pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending),
pv: getTestPV("test-vol0", "2G"),
expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress,
resizeCallCount: 1,
expectedStatusSize: resource.MustParse("2G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing",
pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending),
pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"),
expectError: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed,
resizeCallCount: 1,
expectedStatusSize: resource.MustParse("1G"),
},
{
name: "pv.spec.cap = pvc.status.cap, resizeStatus='', desiredSize = actualSize",
pvc: getTestPVC("test-vol0", "2G", "2G", "2G", v1.PersistentVolumeClaimNoExpansionInProgress),
pv: getTestPV("test-vol0", "2G"),
expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress,
resizeCallCount: 0,
expectedStatusSize: resource.MustParse("2G"),
},
{
name: "pv.spec.cap = pvc.status.cap, resizeStatus='', desiredSize > actualSize",
pvc: getTestPVC("test-vol0", "2G", "2G", "2G", v1.PersistentVolumeClaimNoExpansionInProgress),
pv: getTestPV("test-vol0", "2G"),
desiredSize: getSizeFunc("2G"),
actualSize: getSizeFunc("1G"),
expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress,
resizeCallCount: 1,
expectedStatusSize: resource.MustParse("2G"),
},
{
name: "pv.spec.cap = pvc.status.cap, resizeStatus=node-expansion-failed, desiredSize > actualSize",
pvc: getTestPVC("test-vol0", "2G", "2G", "2G", v1.PersistentVolumeClaimNodeExpansionFailed),
pv: getTestPV("test-vol0", "2G"),
desiredSize: getSizeFunc("2G"),
actualSize: getSizeFunc("1G"),
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed,
resizeCallCount: 0,
expectedStatusSize: resource.MustParse("2G"),
},
}
for i := range tests { for i := range tests {
test := tests[i] test := tests[i]
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, test.recoverFeatureGate)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, true)()
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
test.pv.Spec.ClaimRef = &v1.ObjectReference{
Namespace: test.pvc.Namespace,
Name: test.pvc.Name,
}
pvc := test.pvc pvc := test.pvc
pv := test.pv pv := test.pv
pod := getTestPod("test-pod", pvc.Name) pod := getTestPod("test-pod", pvc.Name)
og := getTestOperationGenerator(volumePluginMgr, pvc, pv) og := getTestOperatorGeneratorWithPVPVC(volumePluginMgr, pvc, pv)
vmt := VolumeToMount{ vmt := VolumeToMount{
Pod: pod, Pod: pod,
VolumeName: v1.UniqueVolumeName(pv.Name), VolumeName: v1.UniqueVolumeName(pv.Name),
VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false), VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false),
} }
resizeOp := nodeResizeOperationOpts{ desiredSize := test.desiredSize
pvc: pvc, if desiredSize == nil {
pv: pv, desiredSize = pv.Spec.Capacity.Storage()
volumePlugin: fakePlugin,
vmt: vmt,
actualStateOfWorld: nil,
} }
actualSize := test.actualSize
if actualSize == nil {
actualSize = pvc.Status.Capacity.Storage()
}
pluginResizeOpts := volume.NodeResizeOptions{
VolumeSpec: vmt.VolumeSpec,
NewSize: *desiredSize,
OldSize: *actualSize,
}
ogInstance, _ := og.(*operationGenerator) ogInstance, _ := og.(*operationGenerator)
expansionResponse := ogInstance.callNodeExpandOnPlugin(resizeOp) _, err := ogInstance.nodeExpandVolume(vmt, nil, pluginResizeOpts)
pvc = expansionResponse.pvc if !test.expectError && err != nil {
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage] t.Errorf("For test %s, expected no error got: %v", test.name, err)
if !test.expectError && expansionResponse.err != nil {
t.Errorf("For test %s, expected no error got: %v", test.name, expansionResponse.err)
} }
if test.expectError && expansionResponse.err == nil { if test.expectError && err == nil {
t.Errorf("For test %s, expected error but got none", test.name) t.Errorf("For test %s, expected error but got none", test.name)
} }
if test.resizeCallCount != fakePlugin.NodeExpandCallCount {
if test.expectResizeCall != expansionResponse.resizeCalled { t.Errorf("for test %s, expected node-expand call count to be %d, got %d", test.name, test.resizeCallCount, fakePlugin.NodeExpandCallCount)
t.Errorf("For test %s, expected resize called %t, got %t", test.name, test.expectResizeCall, expansionResponse.resizeCalled)
}
if test.assumeResizeOpAsFinished != expansionResponse.assumeResizeOpAsFinished {
t.Errorf("For test %s, expected assumeResizeOpAsFinished %t, got %t", test.name, test.assumeResizeOpAsFinished, expansionResponse.assumeResizeOpAsFinished)
}
if test.expectedResizeStatus != *pvc.Status.ResizeStatus {
t.Errorf("For test %s, expected resizeStatus %v, got %v", test.name, test.expectedResizeStatus, *pvc.Status.ResizeStatus)
}
if pvcStatusCap.Cmp(test.expectedStatusSize) != 0 {
t.Errorf("For test %s, expected status size %s, got %s", test.name, test.expectedStatusSize.String(), pvcStatusCap.String())
} }
}) })
} }
@ -353,9 +384,7 @@ func getTestPVC(volumeName string, specSize, statusSize, allocatedSize string, r
if len(allocatedSize) > 0 { if len(allocatedSize) > 0 {
pvc.Status.AllocatedResources = v1.ResourceList{v1.ResourceStorage: resource.MustParse(allocatedSize)} pvc.Status.AllocatedResources = v1.ResourceList{v1.ResourceStorage: resource.MustParse(allocatedSize)}
} }
if len(resizeStatus) > 0 { pvc.Status.ResizeStatus = &resizeStatus
pvc.Status.ResizeStatus = &resizeStatus
}
return pvc return pvc
} }
@ -423,6 +452,31 @@ func getTestOperationGenerator(volumePluginMgr *volume.VolumePluginMgr, objects
return operationGenerator return operationGenerator
} }
func getTestOperatorGeneratorWithPVPVC(volumePluginMgr *volume.VolumePluginMgr, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) OperationGenerator {
fakeKubeClient := fakeclient.NewSimpleClientset(pvc, pv)
fakeKubeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, pvc, nil
})
fakeKubeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) {
return true, pv, nil
})
fakeKubeClient.AddReactor("patch", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
if action.GetSubresource() == "status" {
return true, pvc, nil
}
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
})
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
operationGenerator := NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
fakeHandler)
return operationGenerator
}
func getTestVolumeToUnmount(pod *v1.Pod, pvSpec v1.PersistentVolumeSpec, pluginName string) MountedVolume { func getTestVolumeToUnmount(pod *v1.Pod, pvSpec v1.PersistentVolumeSpec, pluginName string) MountedVolume {
volumeSpec := &volume.Spec{ volumeSpec := &volume.Spec{
PersistentVolume: &v1.PersistentVolume{ PersistentVolume: &v1.PersistentVolume{