diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index bc8495dd93b..00943d13bc4 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -655,6 +655,19 @@ func (asw *actualStateOfWorld) SetDeviceMountState( return nil } +func (asw *actualStateOfWorld) SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize resource.Quantity) { + asw.Lock() + defer asw.Unlock() + + volumeObj, ok := asw.attachedVolumes[volumeName] + // only set volume claim size if claimStatusSize is zero + // this can happen when volume was rebuilt after kubelet startup + if ok && volumeObj.persistentVolumeSize.IsZero() { + volumeObj.persistentVolumeSize = claimSize + asw.attachedVolumes[volumeName] = volumeObj + } +} + func (asw *actualStateOfWorld) DeletePodFromVolume( podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error { asw.Lock() diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 1188d2ef72f..8694edf2597 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -169,7 +169,7 @@ func (rc *reconciler) reconcile() { // attach if kubelet is responsible for attaching volumes. // If underlying PVC was resized while in-use then this function also handles volume // resizing. - rc.mountAttachVolumes() + rc.mountOrAttachVolumes() // Ensure devices that should be detached/unmounted are detached/unmounted. rc.unmountDetachDevices() @@ -193,82 +193,94 @@ func (rc *reconciler) unmountVolumes() { } } -func (rc *reconciler) mountAttachVolumes() { +func (rc *reconciler) mountOrAttachVolumes() { // Ensure volumes that should be attached/mounted are attached/mounted. for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) volumeToMount.DevicePath = devicePath if cache.IsVolumeNotAttachedError(err) { - if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { - //// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens - if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse { - klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod)) - continue - } - // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait - // for controller to finish attaching volume. - klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) - err := rc.operationExecutor.VerifyControllerAttachedVolume( - volumeToMount.VolumeToMount, - rc.nodeName, - rc.actualStateOfWorld) - if err != nil && !isExpectedError(err) { - klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) - } - if err == nil { - klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) - } - } else { - // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher, - // so attach it - volumeToAttach := operationexecutor.VolumeToAttach{ - VolumeName: volumeToMount.VolumeName, - VolumeSpec: volumeToMount.VolumeSpec, - NodeName: rc.nodeName, - } - klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) - err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld) - if err != nil && !isExpectedError(err) { - klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) - } - if err == nil { - klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) - } - } + rc.waitForVolumeAttach(volumeToMount) } else if !volMounted || cache.IsRemountRequiredError(err) { - // Volume is not mounted, or is already mounted, but requires remounting - remountingLogStr := "" - isRemount := cache.IsRemountRequiredError(err) - if isRemount { - remountingLogStr = "Volume is already mounted to pod, but remount was requested." - } - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) - err := rc.operationExecutor.MountVolume( - rc.waitForAttachTimeout, - volumeToMount.VolumeToMount, - rc.actualStateOfWorld, - isRemount) - if err != nil && !isExpectedError(err) { - klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) - } - if err == nil { - if remountingLogStr == "" { - klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) - } else { - klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) - } - } + rc.mountAttachedVolumes(volumeToMount, err) } else if cache.IsFSResizeRequiredError(err) { - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) - err := rc.operationExecutor.ExpandInUseVolume( - volumeToMount.VolumeToMount, - rc.actualStateOfWorld) - if err != nil && !isExpectedError(err) { - klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod)) - } - if err == nil { - klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) - } + rc.expandVolume(volumeToMount) + } + } +} + +func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount) { + klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) + err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld) + + if err != nil && !isExpectedError(err) { + klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod)) + } + + if err == nil { + klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) + } +} + +func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, podExistError error) { + // Volume is not mounted, or is already mounted, but requires remounting + remountingLogStr := "" + isRemount := cache.IsRemountRequiredError(podExistError) + if isRemount { + remountingLogStr = "Volume is already mounted to pod, but remount was requested." + } + klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) + err := rc.operationExecutor.MountVolume( + rc.waitForAttachTimeout, + volumeToMount.VolumeToMount, + rc.actualStateOfWorld, + isRemount) + if err != nil && !isExpectedError(err) { + klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) + } + if err == nil { + if remountingLogStr == "" { + klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) + } else { + klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod)) + } + } +} + +func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) { + if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { + //// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens + if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse { + klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod)) + return + } + // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait + // for controller to finish attaching volume. + klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) + err := rc.operationExecutor.VerifyControllerAttachedVolume( + volumeToMount.VolumeToMount, + rc.nodeName, + rc.actualStateOfWorld) + if err != nil && !isExpectedError(err) { + klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) + } + if err == nil { + klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) + } + } else { + // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher, + // so attach it + volumeToAttach := operationexecutor.VolumeToAttach{ + VolumeName: volumeToMount.VolumeName, + VolumeSpec: volumeToMount.VolumeSpec, + NodeName: rc.nodeName, + } + klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod)) + err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld) + if err != nil && !isExpectedError(err) { + klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod)) + } + if err == nil { + klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod)) } } } diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 244b63e5c3f..7c6d3970511 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -245,6 +245,9 @@ type ActualStateOfWorldAttacherUpdater interface { // Unmarks the desire to detach for the specified volume (add the volume back to // the node's volumesToReportAsAttached list) AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) + + // SetVolumeClaimSize sets pvc claim size by reading pvc.Status.Capacity + SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize resource.Quantity) } // VolumeLogger defines a set of operations for generating volume-related logging and error msgs diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 50fcac3cbf0..e99912a04b2 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -1491,6 +1491,21 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext { migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec) + var claimSize resource.Quantity + + if volumeToMount.VolumeSpec.PersistentVolume != nil { + pv := volumeToMount.VolumeSpec.PersistentVolume + pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{}) + if err != nil { + eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume fetching pvc failed", err) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) + } + pvcStatusSize := pvc.Status.Capacity.Storage() + if pvcStatusSize != nil { + claimSize = *pvcStatusSize + } + } + if !volumeToMount.PluginIsAttachable { // If the volume does not implement the attacher interface, it is // assumed to be attached and the actual state of the world is @@ -1503,7 +1518,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr) return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } - + actualStateOfWorld.SetVolumeClaimSize(volumeToMount.VolumeName, claimSize) return volumetypes.NewOperationContext(nil, nil, migrated) } @@ -1544,6 +1559,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } + actualStateOfWorld.SetVolumeClaimSize(volumeToMount.VolumeName, claimSize) return volumetypes.NewOperationContext(nil, nil, migrated) } }