From e4f62d6c41257d8f4a1d099c072bb6259647fd01 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Tue, 8 Mar 2022 14:35:54 -0500 Subject: [PATCH] Modify code to use new interface functions --- .../cache/actual_state_of_world.go | 5 +++ .../cache/actual_state_of_world.go | 38 +++++++++++++++---- .../cache/actual_state_of_world_test.go | 6 +-- .../cache/desired_state_of_world.go | 13 ++++--- .../desired_state_of_world_populator_test.go | 2 +- .../volumemanager/reconciler/reconciler.go | 2 +- .../reconciler/reconciler_test.go | 6 +-- .../operationexecutor/operation_executor.go | 2 +- .../operationexecutor/operation_generator.go | 4 +- 9 files changed, 55 insertions(+), 23 deletions(-) diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index 652d5421d9a..180f1a7b6ff 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -23,6 +23,7 @@ package cache import ( "fmt" + "k8s.io/apimachinery/pkg/api/resource" "sync" "time" @@ -581,6 +582,10 @@ func (asw *actualStateOfWorld) GetAttachState( return AttachStateDetached } +func (asw *actualStateOfWorld) SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) { + klog.V(5).Infof("doing nothing") +} + func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume { asw.RLock() defer asw.RUnlock() diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 00943d13bc4..a6b31f8c56b 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -107,7 +107,7 @@ type ActualStateOfWorld interface { // volumes, depend on this to update the contents of the volume. // All volume mounting calls should be idempotent so a second mount call for // volumes that do not need to update contents should not fail. - PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) (bool, string, error) + PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName, desiredVolumeSize *resource.Quantity) (bool, string, error) // PodRemovedFromVolume returns true if the given pod does not exist in the list of // mountedPods for the given volume in the cache, indicating that the pod has @@ -287,7 +287,7 @@ type attachedVolume struct { volumeInUseErrorForExpansion bool // persistentVolumeSize records size of the volume when pod was started. - persistentVolumeSize resource.Quantity + persistentVolumeSize *resource.Quantity } // The mountedPod object represents a pod for which the kubelet volume manager @@ -655,14 +655,14 @@ func (asw *actualStateOfWorld) SetDeviceMountState( return nil } -func (asw *actualStateOfWorld) SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize resource.Quantity) { +func (asw *actualStateOfWorld) SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) { asw.Lock() defer asw.Unlock() volumeObj, ok := asw.attachedVolumes[volumeName] // only set volume claim size if claimStatusSize is zero // this can happen when volume was rebuilt after kubelet startup - if ok && volumeObj.persistentVolumeSize.IsZero() { + if ok && volumeObj.persistentVolumeSize == nil { volumeObj.persistentVolumeSize = claimSize asw.attachedVolumes[volumeName] = volumeObj } @@ -710,7 +710,8 @@ func (asw *actualStateOfWorld) DeleteVolume(volumeName v1.UniqueVolumeName) erro func (asw *actualStateOfWorld) PodExistsInVolume( podName volumetypes.UniquePodName, - volumeName v1.UniqueVolumeName) (bool, string, error) { + volumeName v1.UniqueVolumeName, + desiredVolumeSize *resource.Quantity) (bool, string, error) { asw.RLock() defer asw.RUnlock() @@ -728,8 +729,7 @@ func (asw *actualStateOfWorld) PodExistsInVolume( if podObj.remountRequired { return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName) } - if podObj.fsResizeRequired && - !volumeObj.volumeInUseErrorForExpansion { + if asw.volumeNeedsExpansion(volumeObj, desiredVolumeSize) { return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName) } } @@ -737,6 +737,30 @@ func (asw *actualStateOfWorld) PodExistsInVolume( return podExists, volumeObj.devicePath, nil } +func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, desiredVolumeSize *resource.Quantity) bool { + if volumeObj.volumeInUseErrorForExpansion { + return false + } + if volumeObj.persistentVolumeSize == nil || desiredVolumeSize == nil { + return false + } + + if desiredVolumeSize.Cmp(*volumeObj.persistentVolumeSize) > 0 { + volumePlugin, err := asw.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeObj.spec) + if err != nil || volumePlugin == nil { + // Log and continue processing + klog.Errorf( + "PodExistsInVolume failed to find expandable plugin volume: %q (volSpecName: %q)", + volumeObj.volumeName, volumeObj.spec.Name()) + return false + } + if volumePlugin.RequiresFSResize() { + return true + } + } + return false +} + func (asw *actualStateOfWorld) PodRemovedFromVolume( podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) bool { diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go index 1456bdd888b..6f68cf5574f 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go @@ -676,7 +676,7 @@ func TestUncertainVolumeMounts(t *testing.T) { t.Fatalf("expected volume %s to be found in aws.GetPossiblyMountedVolumesForPod", volumeSpec1.Name()) } - volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1) + volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1, nil) if volExists { t.Fatalf("expected volume %s to not exist in asw", generatedVolumeName1) } @@ -762,7 +762,7 @@ func verifyPodExistsInVolumeAsw( expectedDevicePath string, asw ActualStateOfWorld) { podExistsInVolume, devicePath, err := - asw.PodExistsInVolume(expectedPodName, expectedVolumeName) + asw.PodExistsInVolume(expectedPodName, expectedVolumeName, nil) if err != nil { t.Fatalf( "ASW PodExistsInVolume failed. Expected: Actual: <%v>", err) @@ -804,7 +804,7 @@ func verifyPodDoesntExistInVolumeAsw( expectVolumeToExist bool, asw ActualStateOfWorld) { podExistsInVolume, devicePath, err := - asw.PodExistsInVolume(podToCheck, volumeToCheck) + asw.PodExistsInVolume(podToCheck, volumeToCheck, nil) if !expectVolumeToExist && err == nil { t.Fatalf( "ASW PodExistsInVolume did not return error. Expected: Actual: <%v>", err) diff --git a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go index 1dfdf920774..fe9d7f23c4d 100644 --- a/pkg/kubelet/volumemanager/cache/desired_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/desired_state_of_world.go @@ -128,13 +128,14 @@ type DesiredStateOfWorld interface { // UpdatePersistentVolumeSize updates persistentVolumeSize in desired state of the world // so as it can be compared against actual size and volume expansion performed // if necessary - UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size resource.Quantity) + UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size *resource.Quantity) } // VolumeToMount represents a volume that is attached to this node and needs to // be mounted to PodName. type VolumeToMount struct { operationexecutor.VolumeToMount + PersistentVolumeSize *resource.Quantity } // NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld. @@ -193,7 +194,7 @@ type volumeToMount struct { desiredSizeLimit *resource.Quantity // persistentVolumeSize records desired size of a persistent volume. - persistentVolumeSize resource.Quantity + persistentVolumeSize *resource.Quantity } // The pod object represents a pod that references the underlying volume and @@ -295,7 +296,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume( if volumeSpec.PersistentVolume != nil { pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage() if pvCap != nil { - vmt.persistentVolumeSize = *pvCap + vmt.persistentVolumeSize = pvCap } } @@ -366,7 +367,7 @@ func (dsw *desiredStateOfWorld) DeletePodFromVolume( // UpdatePersistentVolumeSize updates last known PV size. This is used for volume expansion and // should be only used for persistent volumes. -func (dsw *desiredStateOfWorld) UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size resource.Quantity) { +func (dsw *desiredStateOfWorld) UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size *resource.Quantity) { dsw.Lock() defer dsw.Unlock() @@ -447,7 +448,9 @@ func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount { VolumeGidValue: volumeObj.volumeGidValue, ReportedInUse: volumeObj.reportedInUse, MountRequestTime: podObj.mountRequestTime, - DesiredSizeLimit: volumeObj.desiredSizeLimit}}) + DesiredSizeLimit: volumeObj.desiredSizeLimit}, + PersistentVolumeSize: volumeObj.persistentVolumeSize, + }) } } return volumesToMount diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index e32dcd6a9f2..6827036b085 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -1108,7 +1108,7 @@ func reprocess(dswp *desiredStateOfWorldPopulator, uniquePodName types.UniquePod func getResizeRequiredVolumes(dsw cache.DesiredStateOfWorld, asw cache.ActualStateOfWorld) []v1.UniqueVolumeName { resizeRequiredVolumes := []v1.UniqueVolumeName{} for _, volumeToMount := range dsw.GetVolumesToMount() { - _, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) + _, _, err := asw.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, nil) if cache.IsFSResizeRequiredError(err) { resizeRequiredVolumes = append(resizeRequiredVolumes, volumeToMount.VolumeName) } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 8694edf2597..a1b258f10ac 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -196,7 +196,7 @@ func (rc *reconciler) unmountVolumes() { func (rc *reconciler) mountOrAttachVolumes() { // Ensure volumes that should be attached/mounted are attached/mounted. for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { - volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) + volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.PersistentVolumeSize) volumeToMount.DevicePath = devicePath if cache.IsVolumeNotAttachedError(err) { rc.waitForVolumeAttach(volumeToMount) diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index a43110c5341..4b5759f243f 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -1287,7 +1287,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { // mark volume as resize required asw.MarkFSResizeRequired(volumeName, podName) - _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName) + _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, nil) if tc.expansionFailed { if cache.IsFSResizeRequiredError(podExistErr) { t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr) @@ -1299,7 +1299,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { go reconciler.Run(wait.NeverStop) waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) { - mounted, _, err := asw.PodExistsInVolume(podName, volumeName) + mounted, _, err := asw.PodExistsInVolume(podName, volumeName, nil) return mounted && err == nil, nil }) if waitErr != nil { @@ -1791,7 +1791,7 @@ func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, podN err := retryWithExponentialBackOff( testOperationBackOffDuration, func() (bool, error) { - mounted, _, err := asw.PodExistsInVolume(podName, volumeName) + mounted, _, err := asw.PodExistsInVolume(podName, volumeName, nil) if mounted || err != nil { return false, nil } diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 7c6d3970511..1f0d713287b 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -247,7 +247,7 @@ type ActualStateOfWorldAttacherUpdater interface { AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) // SetVolumeClaimSize sets pvc claim size by reading pvc.Status.Capacity - SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize resource.Quantity) + SetVolumeClaimSize(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) } // VolumeLogger defines a set of operations for generating volume-related logging and error msgs diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index e99912a04b2..4c9637089cb 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -1491,7 +1491,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext { migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec) - var claimSize resource.Quantity + var claimSize *resource.Quantity if volumeToMount.VolumeSpec.PersistentVolume != nil { pv := volumeToMount.VolumeSpec.PersistentVolume @@ -1502,7 +1502,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( } pvcStatusSize := pvc.Status.Capacity.Storage() if pvcStatusSize != nil { - claimSize = *pvcStatusSize + claimSize = pvcStatusSize } }