diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 63965511ef9..7e0b0f8f94b 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -99,6 +99,11 @@ const ( // Ability to Expand persistent volumes ExpandPersistentVolumes utilfeature.Feature = "ExpandPersistentVolumes" + // owner: @mlmhl + // alpha: v1.11 + // Ability to expand persistent volumes' file system without unmounting volumes. + ExpandPersistentVolumesFSWithoutUnmounting utilfeature.Feature = "ExpandPersistentVolumesFSWithoutUnmounting" + // owner: @verb // alpha: v1.10 // @@ -328,28 +333,29 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS MountPropagation: {Default: true, PreRelease: utilfeature.Beta}, QOSReserved: {Default: false, PreRelease: utilfeature.Alpha}, ExpandPersistentVolumes: {Default: true, PreRelease: utilfeature.Beta}, - CPUManager: {Default: true, PreRelease: utilfeature.Beta}, - ServiceNodeExclusion: {Default: false, PreRelease: utilfeature.Alpha}, - MountContainers: {Default: false, PreRelease: utilfeature.Alpha}, - VolumeScheduling: {Default: true, PreRelease: utilfeature.Beta}, - CSIPersistentVolume: {Default: true, PreRelease: utilfeature.Beta}, - CustomPodDNS: {Default: true, PreRelease: utilfeature.Beta}, - BlockVolume: {Default: false, PreRelease: utilfeature.Alpha}, - StorageObjectInUseProtection: {Default: true, PreRelease: utilfeature.GA}, - ResourceLimitsPriorityFunction: {Default: false, PreRelease: utilfeature.Alpha}, - SupportIPVSProxyMode: {Default: true, PreRelease: utilfeature.GA}, - SupportPodPidsLimit: {Default: false, PreRelease: utilfeature.Alpha}, - HyperVContainer: {Default: false, PreRelease: utilfeature.Alpha}, - ScheduleDaemonSetPods: {Default: false, PreRelease: utilfeature.Alpha}, - TokenRequest: {Default: false, PreRelease: utilfeature.Alpha}, - TokenRequestProjection: {Default: false, PreRelease: utilfeature.Alpha}, - CRIContainerLogRotation: {Default: true, PreRelease: utilfeature.Beta}, - GCERegionalPersistentDisk: {Default: true, PreRelease: utilfeature.Beta}, - RunAsGroup: {Default: false, PreRelease: utilfeature.Alpha}, - VolumeSubpath: {Default: true, PreRelease: utilfeature.GA}, - BalanceAttachedNodeVolumes: {Default: false, PreRelease: utilfeature.Alpha}, - DynamicProvisioningScheduling: {Default: false, PreRelease: utilfeature.Alpha}, - VolumeSubpathEnvExpansion: {Default: false, PreRelease: utilfeature.Alpha}, + ExpandPersistentVolumesFSWithoutUnmounting: {Default: false, PreRelease: utilfeature.Alpha}, + CPUManager: {Default: true, PreRelease: utilfeature.Beta}, + ServiceNodeExclusion: {Default: false, PreRelease: utilfeature.Alpha}, + MountContainers: {Default: false, PreRelease: utilfeature.Alpha}, + VolumeScheduling: {Default: true, PreRelease: utilfeature.Beta}, + CSIPersistentVolume: {Default: true, PreRelease: utilfeature.Beta}, + CustomPodDNS: {Default: true, PreRelease: utilfeature.Beta}, + BlockVolume: {Default: false, PreRelease: utilfeature.Alpha}, + StorageObjectInUseProtection: {Default: true, PreRelease: utilfeature.GA}, + ResourceLimitsPriorityFunction: {Default: false, PreRelease: utilfeature.Alpha}, + SupportIPVSProxyMode: {Default: true, PreRelease: utilfeature.GA}, + SupportPodPidsLimit: {Default: false, PreRelease: utilfeature.Alpha}, + HyperVContainer: {Default: false, PreRelease: utilfeature.Alpha}, + ScheduleDaemonSetPods: {Default: false, PreRelease: utilfeature.Alpha}, + TokenRequest: {Default: false, PreRelease: utilfeature.Alpha}, + TokenRequestProjection: {Default: false, PreRelease: utilfeature.Alpha}, + CRIContainerLogRotation: {Default: true, PreRelease: utilfeature.Beta}, + GCERegionalPersistentDisk: {Default: true, PreRelease: utilfeature.Beta}, + RunAsGroup: {Default: false, PreRelease: utilfeature.Alpha}, + VolumeSubpath: {Default: true, PreRelease: utilfeature.GA}, + BalanceAttachedNodeVolumes: {Default: false, PreRelease: utilfeature.Alpha}, + DynamicProvisioningScheduling: {Default: false, PreRelease: utilfeature.Alpha}, + VolumeSubpathEnvExpansion: {Default: false, PreRelease: utilfeature.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/pkg/kubelet/volumemanager/cache/BUILD b/pkg/kubelet/volumemanager/cache/BUILD index bac59a12bd8..c79fad51379 100644 --- a/pkg/kubelet/volumemanager/cache/BUILD +++ b/pkg/kubelet/volumemanager/cache/BUILD @@ -14,6 +14,7 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache", deps = [ + "//pkg/features:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/util:go_default_library", "//pkg/volume/util/operationexecutor:go_default_library", @@ -21,6 +22,7 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", ], ) diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 6ba3cd0dfda..5b375e780c7 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -28,6 +28,8 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" @@ -148,6 +150,11 @@ type ActualStateOfWorld interface { // with pod's unique name. This map can be used to determine which pod is currently // in actual state of world. GetPods() map[volumetypes.UniquePodName]bool + + // MarkFSResizeRequired marks each volume that is successfully attached and + // mounted for the specified pod as requiring file system resize (if the plugin for the + // volume indicates it requires file system resize). + MarkFSResizeRequired(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) } // MountedVolume represents a volume that has successfully been mounted to a pod. @@ -291,6 +298,10 @@ type mountedPod struct { // volumeGidValue contains the value of the GID annotation, if present. volumeGidValue string + + // fsResizeRequired indicates the underlying volume has been successfully + // mounted to this pod but its size has been expanded after that. + fsResizeRequired bool } func (asw *actualStateOfWorld) MarkVolumeAsAttached( @@ -444,6 +455,34 @@ func (asw *actualStateOfWorld) AddPodToVolume( return nil } +func (asw *actualStateOfWorld) MarkVolumeAsResized( + podName volumetypes.UniquePodName, + volumeName v1.UniqueVolumeName) error { + asw.Lock() + defer asw.Unlock() + + volumeObj, volumeExists := asw.attachedVolumes[volumeName] + if !volumeExists { + return fmt.Errorf( + "no volume with the name %q exists in the list of attached volumes", + volumeName) + } + + podObj, podExists := volumeObj.mountedPods[podName] + if !podExists { + return fmt.Errorf( + "no pod with the name %q exists in the mounted pods list of volume %s", + podName, + volumeName) + } + + glog.V(5).Infof("Volume %s(OuterVolumeSpecName %s) of pod %s has been resized", + volumeName, podObj.outerVolumeSpecName, podName) + podObj.fsResizeRequired = false + asw.attachedVolumes[volumeName].mountedPods[podName] = podObj + return nil +} + func (asw *actualStateOfWorld) MarkRemountRequired( podName volumetypes.UniquePodName) { asw.Lock() @@ -475,6 +514,46 @@ func (asw *actualStateOfWorld) MarkRemountRequired( } } +func (asw *actualStateOfWorld) MarkFSResizeRequired( + volumeName v1.UniqueVolumeName, + podName volumetypes.UniquePodName) { + asw.Lock() + defer asw.Unlock() + volumeObj, exist := asw.attachedVolumes[volumeName] + if !exist { + glog.Warningf("MarkFSResizeRequired for volume %s failed as volume not exist", volumeName) + return + } + + podObj, exist := volumeObj.mountedPods[podName] + if !exist { + glog.Warningf("MarkFSResizeRequired for volume %s failed "+ + "as pod(%s) not exist", volumeName, podName) + return + } + + volumePlugin, err := + asw.volumePluginMgr.FindExpandablePluginBySpec(podObj.volumeSpec) + if err != nil || volumePlugin == nil { + // Log and continue processing + glog.Errorf( + "MarkFSResizeRequired failed to find expandable plugin for pod %q volume: %q (volSpecName: %q)", + podObj.podName, + volumeObj.volumeName, + podObj.volumeSpec.Name()) + return + } + + if volumePlugin.RequiresFSResize() { + if !podObj.fsResizeRequired { + glog.V(3).Infof("PVC volume %s(OuterVolumeSpecName %s) of pod %s requires file system resize", + volumeName, podObj.outerVolumeSpecName, podName) + podObj.fsResizeRequired = true + } + asw.attachedVolumes[volumeName].mountedPods[podName] = podObj + } +} + func (asw *actualStateOfWorld) SetVolumeGloballyMounted( volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error { asw.Lock() @@ -546,8 +625,14 @@ func (asw *actualStateOfWorld) PodExistsInVolume( } podObj, podExists := volumeObj.mountedPods[podName] - if podExists && podObj.remountRequired { - return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName) + if podExists { + if podObj.remountRequired { + return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName) + } + if podObj.fsResizeRequired && + utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumesFSWithoutUnmounting) { + return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName) + } } return podExists, volumeObj.devicePath, nil @@ -716,6 +801,35 @@ func newRemountRequiredError( } } +// fsResizeRequiredError is an error returned when PodExistsInVolume() found +// volume/pod attached/mounted but fsResizeRequired was true, indicating the +// given volume receives an resize request after attached/mounted. +type fsResizeRequiredError struct { + volumeName v1.UniqueVolumeName + podName volumetypes.UniquePodName +} + +func (err fsResizeRequiredError) Error() string { + return fmt.Sprintf( + "volumeName %q mounted to %q needs to resize file system", + err.volumeName, err.podName) +} + +func newFsResizeRequiredError( + volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) error { + return fsResizeRequiredError{ + volumeName: volumeName, + podName: podName, + } +} + +// IsFSResizeRequiredError returns true if the specified error is a +// fsResizeRequiredError. +func IsFSResizeRequiredError(err error) bool { + _, ok := err.(fsResizeRequiredError) + return ok +} + // getMountedVolume constructs and returns a MountedVolume object from the given // mountedPod and attachedVolume objects. func getMountedVolume( diff --git a/pkg/kubelet/volumemanager/populator/BUILD b/pkg/kubelet/volumemanager/populator/BUILD index c44312cb7f0..fe4d2da5a45 100644 --- a/pkg/kubelet/volumemanager/populator/BUILD +++ b/pkg/kubelet/volumemanager/populator/BUILD @@ -25,6 +25,7 @@ go_library( "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", @@ -49,6 +50,7 @@ go_test( srcs = ["desired_state_of_world_populator_test.go"], embed = [":go_default_library"], deps = [ + "//pkg/features:go_default_library", "//pkg/kubelet/configmap:go_default_library", "//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/pod:go_default_library", @@ -61,6 +63,7 @@ go_test( "//pkg/volume/util:go_default_library", "//pkg/volume/util/types:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index cc642511a72..f4894d8e92d 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -30,6 +30,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" @@ -182,12 +183,26 @@ func (dswp *desiredStateOfWorldPopulator) isPodTerminated(pod *v1.Pod) bool { // Iterate through all pods and add to desired state of world if they don't // exist but should func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() { + // Map unique pod name to outer volume name to MountedVolume. + mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume) + if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumesFSWithoutUnmounting) { + for _, mountedVolume := range dswp.actualStateOfWorld.GetMountedVolumes() { + mountedVolumes, exist := mountedVolumesForPod[mountedVolume.PodName] + if !exist { + mountedVolumes = make(map[string]cache.MountedVolume) + mountedVolumesForPod[mountedVolume.PodName] = mountedVolumes + } + mountedVolumes[mountedVolume.OuterVolumeSpecName] = mountedVolume + } + } + + processedVolumesForFSResize := sets.NewString() for _, pod := range dswp.podManager.GetPods() { if dswp.isPodTerminated(pod) { // Do not (re)add volumes for terminated pods continue } - dswp.processPodVolumes(pod) + dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize) } } @@ -259,7 +274,10 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { // processPodVolumes processes the volumes in the given pod and adds them to the // desired state of the world. -func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) { +func (dswp *desiredStateOfWorldPopulator) processPodVolumes( + pod *v1.Pod, + mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume, + processedVolumesForFSResize sets.String) { if pod == nil { return } @@ -274,7 +292,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) { // Process volume spec for each volume defined in pod for _, podVolume := range pod.Spec.Volumes { - volumeSpec, volumeGidValue, err := + pvc, volumeSpec, volumeGidValue, err := dswp.createVolumeSpec(podVolume, pod.Name, pod.Namespace, mountsMap, devicesMap) if err != nil { glog.Errorf( @@ -304,6 +322,11 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) { podVolume.Name, volumeSpec.Name(), uniquePodName) + + if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumesFSWithoutUnmounting) { + dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, + uniquePodName, mountedVolumesForPod, processedVolumesForFSResize) + } } // some of the volume additions may have failed, should not mark this pod as fully processed @@ -316,6 +339,106 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) { } +// checkVolumeFSResize checks whether a PVC mounted by the pod requires file +// system resize or not. If so, marks this volume as fsResizeRequired in ASW. +// - mountedVolumesForPod stores all mounted volumes in ASW, because online +// volume resize only considers mounted volumes. +// - processedVolumesForFSResize stores all volumes we have checked in current loop, +// because file system resize operation is a global operation for volume, so +// we only need to check it once if more than one pod use it. +func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize( + pod *v1.Pod, + podVolume v1.Volume, + pvc *v1.PersistentVolumeClaim, + volumeSpec *volume.Spec, + uniquePodName volumetypes.UniquePodName, + mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume, + processedVolumesForFSResize sets.String) { + if podVolume.PersistentVolumeClaim == nil { + // Only PVC supports resize operation. + return + } + uniqueVolumeName, exist := getUniqueVolumeName(uniquePodName, podVolume.Name, mountedVolumesForPod) + if !exist { + // Volume not exist in ASW, we assume it hasn't been mounted yet. If it needs resize, + // it will be handled as offline resize(if it indeed hasn't been mounted yet), + // or online resize in subsequent loop(after we confirm it has been mounted). + return + } + fsVolume, err := util.CheckVolumeModeFilesystem(volumeSpec) + if err != nil { + glog.Errorf("Check volume mode failed for volume %s(OuterVolumeSpecName %s): %v", + uniqueVolumeName, podVolume.Name, err) + return + } + if !fsVolume { + glog.V(5).Infof("Block mode volume needn't to check file system resize request") + return + } + if processedVolumesForFSResize.Has(string(uniqueVolumeName)) { + // File system resize operation is a global operation for volume, + // so we only need to check it once if more than one pod use it. + return + } + if mountedReadOnlyByPod(podVolume, pod) { + // This volume is used as read only by this pod, we don't perform resize for read only volumes. + glog.V(5).Infof("Skip file system resize check for volume %s in pod %s/%s "+ + "as the volume is mounted as readonly", podVolume.Name, pod.Namespace, pod.Name) + return + } + if volumeRequiresFSResize(pvc, volumeSpec.PersistentVolume) { + dswp.actualStateOfWorld.MarkFSResizeRequired(uniqueVolumeName, uniquePodName) + } + processedVolumesForFSResize.Insert(string(uniqueVolumeName)) +} + +func mountedReadOnlyByPod(podVolume v1.Volume, pod *v1.Pod) bool { + if podVolume.PersistentVolumeClaim.ReadOnly { + return true + } + for _, container := range pod.Spec.InitContainers { + if !mountedReadOnlyByContainer(podVolume.Name, &container) { + return false + } + } + for _, container := range pod.Spec.Containers { + if !mountedReadOnlyByContainer(podVolume.Name, &container) { + return false + } + } + return true +} + +func mountedReadOnlyByContainer(volumeName string, container *v1.Container) bool { + for _, volumeMount := range container.VolumeMounts { + if volumeMount.Name == volumeName && !volumeMount.ReadOnly { + return false + } + } + return true +} + +func getUniqueVolumeName( + podName volumetypes.UniquePodName, + outerVolumeSpecName string, + mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume) (v1.UniqueVolumeName, bool) { + mountedVolumes, exist := mountedVolumesForPod[podName] + if !exist { + return "", false + } + mountedVolume, exist := mountedVolumes[outerVolumeSpecName] + if !exist { + return "", false + } + return mountedVolume.VolumeName, true +} + +func volumeRequiresFSResize(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool { + capacity := pvc.Status.Capacity[v1.ResourceStorage] + requested := pv.Spec.Capacity[v1.ResourceStorage] + return requested.Cmp(capacity) > 0 +} + // podPreviouslyProcessed returns true if the volumes for this pod have already // been processed by the populator func (dswp *desiredStateOfWorldPopulator) podPreviouslyProcessed( @@ -350,7 +473,7 @@ func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod( // specified volume. It dereference any PVC to get PV objects, if needed. // Returns an error if unable to obtain the volume at this time. func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( - podVolume v1.Volume, podName string, podNamespace string, mountsMap map[string]bool, devicesMap map[string]bool) (*volume.Spec, string, error) { + podVolume v1.Volume, podName string, podNamespace string, mountsMap map[string]bool, devicesMap map[string]bool) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) { if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil { glog.V(5).Infof( @@ -359,15 +482,16 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( pvcSource.ClaimName) // If podVolume is a PVC, fetch the real PV behind the claim - pvName, pvcUID, err := dswp.getPVCExtractPV( + pvc, err := dswp.getPVCExtractPV( podNamespace, pvcSource.ClaimName) if err != nil { - return nil, "", fmt.Errorf( + return nil, nil, "", fmt.Errorf( "error processing PVC %q/%q: %v", podNamespace, pvcSource.ClaimName, err) } + pvName, pvcUID := pvc.Spec.VolumeName, pvc.UID glog.V(5).Infof( "Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q", @@ -380,7 +504,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( volumeSpec, volumeGidValue, err := dswp.getPVSpec(pvName, pvcSource.ReadOnly, pvcUID) if err != nil { - return nil, "", fmt.Errorf( + return nil, nil, "", fmt.Errorf( "error processing PVC %q/%q: %v", podNamespace, pvcSource.ClaimName, @@ -399,11 +523,11 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) { volumeMode, err := util.GetVolumeMode(volumeSpec) if err != nil { - return nil, "", err + return nil, nil, "", err } // Error if a container has volumeMounts but the volumeMode of PVC isn't Filesystem if mountsMap[podVolume.Name] && volumeMode != v1.PersistentVolumeFilesystem { - return nil, "", fmt.Errorf( + return nil, nil, "", fmt.Errorf( "Volume %q has volumeMode %q, but is specified in volumeMounts for pod %q/%q", podVolume.Name, volumeMode, @@ -412,7 +536,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( } // Error if a container has volumeDevices but the volumeMode of PVC isn't Block if devicesMap[podVolume.Name] && volumeMode != v1.PersistentVolumeBlock { - return nil, "", fmt.Errorf( + return nil, nil, "", fmt.Errorf( "Volume %q has volumeMode %q, but is specified in volumeDevices for pod %q/%q", podVolume.Name, volumeMode, @@ -420,13 +544,13 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( podName) } } - return volumeSpec, volumeGidValue, nil + return pvc, volumeSpec, volumeGidValue, nil } // Do not return the original volume object, since the source could mutate it clonedPodVolume := podVolume.DeepCopy() - return volume.NewSpecFromVolume(clonedPodVolume), "", nil + return nil, volume.NewSpecFromVolume(clonedPodVolume), "", nil } // getPVCExtractPV fetches the PVC object with the given namespace and name from @@ -434,11 +558,11 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( // it is pointing to and returns it. // An error is returned if the PVC object's phase is not "Bound". func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV( - namespace string, claimName string) (string, types.UID, error) { + namespace string, claimName string) (*v1.PersistentVolumeClaim, error) { pvc, err := dswp.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(claimName, metav1.GetOptions{}) if err != nil || pvc == nil { - return "", "", fmt.Errorf( + return nil, fmt.Errorf( "failed to fetch PVC %s/%s from API server. err=%v", namespace, claimName, @@ -455,7 +579,7 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV( // It should happen only in very rare case when scheduler schedules // a pod and user deletes a PVC that's used by it at the same time. if pvc.ObjectMeta.DeletionTimestamp != nil { - return "", "", fmt.Errorf( + return nil, fmt.Errorf( "can't start pod because PVC %s/%s is being deleted", namespace, claimName) @@ -464,7 +588,7 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV( if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" { - return "", "", fmt.Errorf( + return nil, fmt.Errorf( "PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)", namespace, claimName, @@ -472,7 +596,7 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV( pvc.Spec.VolumeName) } - return pvc.Spec.VolumeName, pvc.UID, nil + return pvc, nil } // getPVSpec fetches the PV object with the given name from the API server diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index 1ecf9674dd8..5acd7df04d9 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -20,12 +20,16 @@ import ( "testing" "time" + "fmt" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/configmap" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" @@ -294,7 +298,7 @@ func TestCreateVolumeSpec_Valid_File_VolumeMounts(t *testing.T) { fakePodManager.AddPod(pod) mountsMap, devicesMap := dswp.makeVolumeMap(pod.Spec.Containers) - volumeSpec, _, err := + _, volumeSpec, _, err := dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap) // Assert @@ -343,7 +347,7 @@ func TestCreateVolumeSpec_Valid_Block_VolumeDevices(t *testing.T) { fakePodManager.AddPod(pod) mountsMap, devicesMap := dswp.makeVolumeMap(pod.Spec.Containers) - volumeSpec, _, err := + _, volumeSpec, _, err := dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap) // Assert @@ -395,7 +399,7 @@ func TestCreateVolumeSpec_Invalid_File_VolumeDevices(t *testing.T) { fakePodManager.AddPod(pod) mountsMap, devicesMap := dswp.makeVolumeMap(pod.Spec.Containers) - volumeSpec, _, err := + _, volumeSpec, _, err := dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap) // Assert @@ -447,7 +451,7 @@ func TestCreateVolumeSpec_Invalid_Block_VolumeMounts(t *testing.T) { fakePodManager.AddPod(pod) mountsMap, devicesMap := dswp.makeVolumeMap(pod.Spec.Containers) - volumeSpec, _, err := + _, volumeSpec, _, err := dswp.createVolumeSpec(pod.Spec.Volumes[0], pod.Name, pod.Namespace, mountsMap, devicesMap) // Assert @@ -459,6 +463,155 @@ func TestCreateVolumeSpec_Invalid_Block_VolumeMounts(t *testing.T) { utilfeature.DefaultFeatureGate.Set("BlockVolume=false") } +func TestCheckVolumeFSResize(t *testing.T) { + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dswp-test-volume-name", + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{RBD: &v1.RBDPersistentVolumeSource{}}, + Capacity: volumeCapacity(1), + ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "file-bound"}, + }, + } + pvc := &v1.PersistentVolumeClaim{ + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: "dswp-test-volume-name", + Resources: v1.ResourceRequirements{ + Requests: volumeCapacity(1), + }, + }, + Status: v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimBound, + Capacity: volumeCapacity(1), + }, + } + dswp, fakePodManager, fakeDSW := createDswpWithVolume(t, pv, pvc) + fakeASW := dswp.actualStateOfWorld + + // create pod + containers := []v1.Container{ + { + VolumeMounts: []v1.VolumeMount{ + { + Name: "dswp-test-volume-name", + MountPath: "/mnt", + }, + }, + }, + } + pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers) + uniquePodName := types.UniquePodName(pod.UID) + uniqueVolumeName := v1.UniqueVolumeName("fake-plugin/" + pod.Spec.Volumes[0].Name) + + fakePodManager.AddPod(pod) + // Fill the dsw to contains volumes and pods. + dswp.findAndAddNewPods() + reconcileASW(fakeASW, fakeDSW, t) + + // No resize request for volume, volumes in ASW shouldn't be marked as fsResizeRequired. + setExpandOnlinePersistentVolumesFeatureGate("true", t) + resizeRequiredVolumes := reprocess(dswp, uniquePodName, fakeDSW, fakeASW) + if len(resizeRequiredVolumes) > 0 { + t.Fatalf("No resize request for any volumes, but found resize required volumes in ASW: %v", resizeRequiredVolumes) + } + + // Add a resize request to volume. + pv.Spec.Capacity = volumeCapacity(2) + pvc.Spec.Resources.Requests = volumeCapacity(2) + + // Disable the feature gate, so volume shouldn't be marked as fsResizeRequired. + setExpandOnlinePersistentVolumesFeatureGate("false", t) + resizeRequiredVolumes = reprocess(dswp, uniquePodName, fakeDSW, fakeASW) + if len(resizeRequiredVolumes) > 0 { + t.Fatalf("Feature gate disabled, but found resize required volumes in ASW: %v", resizeRequiredVolumes) + } + + // Make volume used as ReadOnly, so volume shouldn't be marked as fsResizeRequired. + setExpandOnlinePersistentVolumesFeatureGate("true", t) + pod.Spec.Containers[0].VolumeMounts[0].ReadOnly = true + resizeRequiredVolumes = reprocess(dswp, uniquePodName, fakeDSW, fakeASW) + if len(resizeRequiredVolumes) > 0 { + t.Fatalf("volume mounted as ReadOnly, but found resize required volumes in ASW: %v", resizeRequiredVolumes) + } + + // Clear ASW, so volume shouldn't be marked as fsResizeRequired because they are not mounted. + pod.Spec.Containers[0].VolumeMounts[0].ReadOnly = false + clearASW(fakeASW, fakeDSW, t) + resizeRequiredVolumes = reprocess(dswp, uniquePodName, fakeDSW, fakeASW) + if len(resizeRequiredVolumes) > 0 { + t.Fatalf("volume hasn't been mounted, but found resize required volumes in ASW: %v", resizeRequiredVolumes) + } + + // volume in ASW should be marked as fsResizeRequired. + reconcileASW(fakeASW, fakeDSW, t) + resizeRequiredVolumes = reprocess(dswp, uniquePodName, fakeDSW, fakeASW) + if len(resizeRequiredVolumes) == 0 { + t.Fatalf("Request resize for volume, but volume in ASW hasn't been marked as fsResizeRequired") + } + if len(resizeRequiredVolumes) != 1 { + t.Fatalf("Some unexpected volumes are marked as fsResizeRequired: %v", resizeRequiredVolumes) + } + if resizeRequiredVolumes[0] != uniqueVolumeName { + t.Fatalf("Mark wrong volume as fsResizeRequired: %s", resizeRequiredVolumes[0]) + } +} + +func volumeCapacity(size int) v1.ResourceList { + return v1.ResourceList{v1.ResourceStorage: resource.MustParse(fmt.Sprintf("%dGi", size))} +} + +func setExpandOnlinePersistentVolumesFeatureGate(value string, t *testing.T) { + err := utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=%s", features.ExpandPersistentVolumesFSWithoutUnmounting, value)) + if err != nil { + t.Fatalf("Set ExpandPersistentVolumesFSWithoutUnmounting feature gate to %s failed: %v", value, err) + } +} + +func reconcileASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t *testing.T) { + for _, volumeToMount := range dsw.GetVolumesToMount() { + err := asw.MarkVolumeAsAttached(volumeToMount.VolumeName, volumeToMount.VolumeSpec, "", "") + if err != nil { + t.Fatalf("Unexpected error when MarkVolumeAsAttached: %v", err) + } + err = asw.MarkVolumeAsMounted(volumeToMount.PodName, volumeToMount.Pod.UID, + volumeToMount.VolumeName, nil, nil, volumeToMount.OuterVolumeSpecName, volumeToMount.VolumeGidValue, volumeToMount.VolumeSpec) + if err != nil { + t.Fatalf("Unexpected error when MarkVolumeAsMounted: %v", err) + } + } +} + +func clearASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t *testing.T) { + for _, volumeToMount := range dsw.GetVolumesToMount() { + err := asw.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName) + if err != nil { + t.Fatalf("Unexpected error when MarkVolumeAsUnmounted: %v", err) + } + } + for _, volumeToMount := range dsw.GetVolumesToMount() { + asw.MarkVolumeAsDetached(volumeToMount.VolumeName, "") + } +} + +func reprocess(dswp *desiredStateOfWorldPopulator, uniquePodName types.UniquePodName, + dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName { + dswp.ReprocessPod(uniquePodName) + dswp.findAndAddNewPods() + return getResizeRequiredVolumes(dsw, asw) +} + +func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName { + resizeRequiredVolumes := []v1.UniqueVolumeName{} + for _, volumeToMount := range dsw.GetVolumesToMount() { + _, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) + if cache.IsFSResizeRequiredError(err) { + resizeRequiredVolumes = append(resizeRequiredVolumes, volumeToMount.VolumeName) + } + } + return resizeRequiredVolumes +} + func verifyVolumeExistsInVolumesToMount(t *testing.T, expectedVolumeName v1.UniqueVolumeName, expectReportedInUse bool, dsw cache.DesiredStateOfWorld) { volumesToMount := dsw.GetVolumesToMount() for _, volume := range volumesToMount { diff --git a/pkg/kubelet/volumemanager/reconciler/BUILD b/pkg/kubelet/volumemanager/reconciler/BUILD index b2a41d00065..5877ceced44 100644 --- a/pkg/kubelet/volumemanager/reconciler/BUILD +++ b/pkg/kubelet/volumemanager/reconciler/BUILD @@ -38,6 +38,7 @@ go_test( srcs = ["reconciler_test.go"], embed = [":go_default_library"], deps = [ + "//pkg/features:go_default_library", "//pkg/kubelet/volumemanager/cache:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 5d6619d5b32..21a58394e4a 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -254,6 +254,22 @@ func (rc *reconciler) reconcile() { glog.V(5).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr)) } } + } else if cache.IsFSResizeRequiredError(err) && + utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumesFSWithoutUnmounting) { + glog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandVolumeFSWithoutUnmounting", "")) + err := rc.operationExecutor.ExpandVolumeFSWithoutUnmounting( + volumeToMount.VolumeToMount, + rc.actualStateOfWorld) + if err != nil && + !nestedpendingoperations.IsAlreadyExists(err) && + !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. + // Log all other errors. + glog.Errorf(volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandVolumeFSWithoutUnmounting failed", err).Error()) + } + if err == nil { + glog.V(4).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandVolumeFSWithoutUnmounting started", "")) + } } } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index bc615ddcaa2..c67ea7cc3fd 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" @@ -965,6 +966,120 @@ func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) { utilfeature.DefaultFeatureGate.Set("BlockVolume=false") } +// Populates desiredStateOfWorld cache with one volume/pod. +// Enables controllerAttachDetachEnabled. +// Calls Run() +// Wait for volume mounted. +// Mark volume as fsResizeRequired in ASW. +// Verifies volume's fsResizeRequired flag is cleared later. +func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { + utilfeature.DefaultFeatureGate.Set(fmt.Sprintf("%s=true", features.ExpandPersistentVolumesFSWithoutUnmounting)) + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pv", + UID: "pvuid", + }, + Spec: v1.PersistentVolumeSpec{ + ClaimRef: &v1.ObjectReference{Name: "pvc"}, + }, + } + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc", + UID: "pvcuid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: "pv", + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, + }, + }, + }, + }, + } + + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createtestClientWithPVPVC(pv, pvc) + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + + reconciler := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + reconcilerSyncStatesSleepPeriod, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + &mount.FakeMounter{}, + volumePluginMgr, + kubeletPodsDir) + + volumeSpec := &volume.Spec{PersistentVolume: pv} + podName := util.GetUniquePodName(pod) + volumeName, err := dsw.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + // Assert + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) + + // Start the reconciler to fill ASW. + stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) + go func() { + reconciler.Run(stopChan) + close(stoppedChan) + }() + waitForMount(t, fakePlugin, volumeName, asw) + // Stop the reconciler. + close(stopChan) + <-stoppedChan + + // Mark volume as fsResizeRequired. + asw.MarkFSResizeRequired(volumeName, podName) + _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName) + if !cache.IsFSResizeRequiredError(podExistErr) { + t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr) + } + + // Start the reconciler again, we hope reconciler will perform the + // resize operation and clear the fsResizeRequired flag for volume. + go reconciler.Run(wait.NeverStop) + + waitErr := retryWithExponentialBackOff(500*time.Millisecond, func() (done bool, err error) { + mounted, _, err := asw.PodExistsInVolume(podName, volumeName) + return mounted && err == nil, nil + }) + if waitErr != nil { + t.Fatal("Volume resize should succeeded") + } +} + func waitForMount( t *testing.T, fakePlugin *volumetesting.FakeVolumePlugin, @@ -1044,3 +1159,30 @@ func createTestClient() *fake.Clientset { func runReconciler(reconciler Reconciler) { go reconciler.Run(wait.NeverStop) } + +func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) *fake.Clientset { + fakeClient := &fake.Clientset{} + fakeClient.AddReactor("get", "nodes", + func(action core.Action) (bool, runtime.Object, error) { + return true, &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)}, + Status: v1.NodeStatus{ + VolumesAttached: []v1.AttachedVolume{ + { + Name: "fake-plugin/pv", + DevicePath: "fake/path", + }, + }}, + }, nil + }) + fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { + return true, pvc, nil + }) + fakeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) { + return true, pv, nil + }) + fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("no reaction implemented for %s", action) + }) + return fakeClient +} diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 1cfe6dddc1a..fb805812d6e 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -439,6 +439,15 @@ func (plugin *FakeVolumePlugin) GetDeviceMountRefs(deviceMountPath string) ([]st return []string{}, nil } +// Expandable volume support +func (plugin *FakeVolumePlugin) ExpandVolumeDevice(spec *Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error) { + return resource.Quantity{}, nil +} + +func (plugin *FakeVolumePlugin) RequiresFSResize() bool { + return true +} + type FakeFileVolumePlugin struct { } diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index c065ace25e1..283a322259f 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -142,6 +142,8 @@ type OperationExecutor interface { IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool // Expand Volume will grow size available to PVC ExpandVolume(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) error + // ExpandVolumeFSWithoutUnmounting will resize volume's file system to expected size without unmounting the volume. + ExpandVolumeFSWithoutUnmounting(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, mountPath string, pluginName string) (*volume.Spec, error) // CheckVolumeExistenceOperation checks volume existence @@ -173,6 +175,9 @@ type ActualStateOfWorldMounterUpdater interface { // Marks the specified volume as having its global mount unmounted. MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error + + // Marks the specified volume's file system resize request is finished. + MarkVolumeAsResized(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error } // ActualStateOfWorldAttacherUpdater defines a set of operations updating the @@ -817,6 +822,14 @@ func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCW return oe.pendingOperations.Run(uniqueVolumeKey, "", generatedOperations) } +func (oe *operationExecutor) ExpandVolumeFSWithoutUnmounting(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { + generatedOperations, err := oe.operationGenerator.GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount, actualStateOfWorld) + if err != nil { + return err + } + return oe.pendingOperations.Run(volumeToMount.VolumeName, "", generatedOperations) +} + func (oe *operationExecutor) VerifyControllerAttachedVolume( volumeToMount VolumeToMount, nodeName types.NodeName, diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index 0551c4733fb..7d427e48f0f 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -438,6 +438,16 @@ func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvcWithResizeReques }, nil } +func (fopg *fakeOperationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + opFunc := func() (error, error) { + startOperationAndBlock(fopg.ch, fopg.quit) + return nil, nil + } + return volumetypes.GeneratedOperations{ + OperationFunc: opFunc, + }, nil +} + func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc( pluginNodeVolumes map[types.NodeName][]*volume.Spec, pluginNane string, diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index a9312fe7fcf..582d74b6859 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -121,6 +121,9 @@ type OperationGenerator interface { map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) GenerateExpandVolumeFunc(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) (volumetypes.GeneratedOperations, error) + + // Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume. + GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) } func (og *operationGenerator) GenerateVolumesAreAttachedFunc( @@ -1306,6 +1309,62 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( }, nil } +func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( + volumeToMount VolumeToMount, + actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + volumePlugin, err := + og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) + if err != nil || volumePlugin == nil { + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.FindPluginBySpec failed", err) + } + + attachableVolumePlugin, err := + og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec) + if err != nil || attachableVolumePlugin == nil { + if attachableVolumePlugin == nil { + err = fmt.Errorf("AttachableVolumePlugin is nil") + } + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.FindAttachablePluginBySpec failed", err) + } + + volumeAttacher, err := attachableVolumePlugin.NewAttacher() + if err != nil || volumeAttacher == nil { + if volumeAttacher == nil { + err = fmt.Errorf("VolumeAttacher is nil") + } + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.NewAttacher failed", err) + } + + deviceMountPath, err := volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec) + if err != nil { + return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.GetDeviceMountPath failed", err) + } + + fsResizeFunc := func() (error, error) { + resizeSimpleError, resizeDetailedError := og.resizeFileSystem(volumeToMount, volumeToMount.DevicePath, deviceMountPath, volumePlugin.GetPluginName()) + if resizeSimpleError != nil || resizeDetailedError != nil { + return resizeSimpleError, resizeDetailedError + } + markFSResizedErr := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.PodName, volumeToMount.VolumeName) + if markFSResizedErr != nil { + // On failure, return error. Caller will log and retry. + return volumeToMount.GenerateError("VolumeFSResize.MarkVolumeAsResized failed", markFSResizedErr) + } + return nil, nil + } + eventRecorderFunc := func(err *error) { + if *err != nil { + og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error()) + } + } + + return volumetypes.GeneratedOperations{ + OperationFunc: fsResizeFunc, + EventRecorderFunc: eventRecorderFunc, + CompleteFunc: util.OperationCompleteHook(volumePlugin.GetPluginName(), "volume_fs_resize"), + }, nil +} + func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error { mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec)