Merge pull request #127919 from carlory/fix-127852

Fix data race in kubelet/volumemanager
This commit is contained in:
Kubernetes Prow Robot 2024-10-17 14:57:03 +01:00 committed by GitHub
commit 1f9038a468
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 28 additions and 31 deletions

View File

@ -584,13 +584,13 @@ func (asw *actualStateOfWorld) GetAttachState(
} }
// SetVolumeClaimSize sets size of the volume. But this function should not be used from attach_detach controller. // SetVolumeClaimSize sets size of the volume. But this function should not be used from attach_detach controller.
func (asw *actualStateOfWorld) InitializeClaimSize(logger klog.Logger, volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) { func (asw *actualStateOfWorld) InitializeClaimSize(logger klog.Logger, volumeName v1.UniqueVolumeName, claimSize resource.Quantity) {
logger.V(5).Info("no-op InitializeClaimSize call in attach-detach controller") logger.V(5).Info("no-op InitializeClaimSize call in attach-detach controller")
} }
func (asw *actualStateOfWorld) GetClaimSize(volumeName v1.UniqueVolumeName) *resource.Quantity { func (asw *actualStateOfWorld) GetClaimSize(volumeName v1.UniqueVolumeName) resource.Quantity {
// not needed in attach-detach controller // not needed in attach-detach controller
return nil return resource.Quantity{}
} }
func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume { func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume {

View File

@ -321,7 +321,7 @@ type attachedVolume struct {
// persistentVolumeSize records size of the volume when pod was started or // persistentVolumeSize records size of the volume when pod was started or
// size after successful completion of volume expansion operation. // size after successful completion of volume expansion operation.
persistentVolumeSize *resource.Quantity persistentVolumeSize resource.Quantity
// seLinuxMountContext is the context with that the volume is mounted to global directory // seLinuxMountContext is the context with that the volume is mounted to global directory
// (via -o context=XYZ mount option). If nil, the volume is not mounted. If "", the volume is // (via -o context=XYZ mount option). If nil, the volume is not mounted. If "", the volume is
@ -788,7 +788,7 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M
return nil return nil
} }
func (asw *actualStateOfWorld) MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool { func (asw *actualStateOfWorld) MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize resource.Quantity) bool {
asw.Lock() asw.Lock()
defer asw.Unlock() defer asw.Unlock()
@ -850,28 +850,28 @@ func (asw *actualStateOfWorld) SetDeviceMountState(
return nil return nil
} }
func (asw *actualStateOfWorld) InitializeClaimSize(logger klog.Logger, volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) { func (asw *actualStateOfWorld) InitializeClaimSize(logger klog.Logger, volumeName v1.UniqueVolumeName, claimSize resource.Quantity) {
asw.Lock() asw.Lock()
defer asw.Unlock() defer asw.Unlock()
volumeObj, ok := asw.attachedVolumes[volumeName] volumeObj, ok := asw.attachedVolumes[volumeName]
// only set volume claim size if claimStatusSize is zero // only set volume claim size if claimStatusSize is zero
// this can happen when volume was rebuilt after kubelet startup // this can happen when volume was rebuilt after kubelet startup
if ok && volumeObj.persistentVolumeSize == nil { if ok && volumeObj.persistentVolumeSize.IsZero() {
volumeObj.persistentVolumeSize = claimSize volumeObj.persistentVolumeSize = claimSize
asw.attachedVolumes[volumeName] = volumeObj asw.attachedVolumes[volumeName] = volumeObj
} }
} }
func (asw *actualStateOfWorld) GetClaimSize(volumeName v1.UniqueVolumeName) *resource.Quantity { func (asw *actualStateOfWorld) GetClaimSize(volumeName v1.UniqueVolumeName) resource.Quantity {
asw.RLock() asw.RLock()
defer asw.RUnlock() defer asw.RUnlock()
volumeObj, ok := asw.attachedVolumes[volumeName] volumeObj, ok := asw.attachedVolumes[volumeName]
if ok { if ok {
return volumeObj.persistentVolumeSize return volumeObj.persistentVolumeSize.DeepCopy()
} }
return nil return resource.Quantity{}
} }
func (asw *actualStateOfWorld) DeletePodFromVolume( func (asw *actualStateOfWorld) DeletePodFromVolume(
@ -970,20 +970,17 @@ func (asw *actualStateOfWorld) PodHasMountedVolumes(podName volumetypes.UniquePo
} }
func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, desiredVolumeSize resource.Quantity) (resource.Quantity, bool) { func (asw *actualStateOfWorld) volumeNeedsExpansion(volumeObj attachedVolume, desiredVolumeSize resource.Quantity) (resource.Quantity, bool) {
currentSize := resource.Quantity{} currentSize := volumeObj.persistentVolumeSize.DeepCopy()
if volumeObj.persistentVolumeSize != nil {
currentSize = volumeObj.persistentVolumeSize.DeepCopy()
}
if volumeObj.volumeInUseErrorForExpansion { if volumeObj.volumeInUseErrorForExpansion {
return currentSize, false return currentSize, false
} }
if volumeObj.persistentVolumeSize == nil || desiredVolumeSize.IsZero() { if volumeObj.persistentVolumeSize.IsZero() || desiredVolumeSize.IsZero() {
return currentSize, false return currentSize, false
} }
klog.V(5).InfoS("NodeExpandVolume checking size", "actualSize", volumeObj.persistentVolumeSize.String(), "desiredSize", desiredVolumeSize.String(), "volume", volumeObj.volumeName) klog.V(5).InfoS("NodeExpandVolume checking size", "actualSize", volumeObj.persistentVolumeSize.String(), "desiredSize", desiredVolumeSize.String(), "volume", volumeObj.volumeName)
if desiredVolumeSize.Cmp(*volumeObj.persistentVolumeSize) > 0 { if desiredVolumeSize.Cmp(volumeObj.persistentVolumeSize) > 0 {
volumePlugin, err := asw.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeObj.spec) volumePlugin, err := asw.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeObj.spec)
if err != nil || volumePlugin == nil { if err != nil || volumePlugin == nil {
// Log and continue processing // Log and continue processing

View File

@ -134,7 +134,7 @@ type DesiredStateOfWorld interface {
// UpdatePersistentVolumeSize updates persistentVolumeSize in desired state of the world // UpdatePersistentVolumeSize updates persistentVolumeSize in desired state of the world
// so as it can be compared against actual size and volume expansion performed // so as it can be compared against actual size and volume expansion performed
// if necessary // if necessary
UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size *resource.Quantity) UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size resource.Quantity)
} }
// VolumeToMount represents a volume that is attached to this node and needs to // VolumeToMount represents a volume that is attached to this node and needs to
@ -206,7 +206,7 @@ type volumeToMount struct {
// persistentVolumeSize records desired size of a persistent volume. // persistentVolumeSize records desired size of a persistent volume.
// Usually this value reflects size recorded in pv.Spec.Capacity // Usually this value reflects size recorded in pv.Spec.Capacity
persistentVolumeSize *resource.Quantity persistentVolumeSize resource.Quantity
// effectiveSELinuxMountFileLabel is the SELinux label that will be applied to the volume using mount options. // effectiveSELinuxMountFileLabel is the SELinux label that will be applied to the volume using mount options.
// If empty, then: // If empty, then:
@ -347,7 +347,7 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage() pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage()
if pvCap != nil { if pvCap != nil {
pvCapCopy := pvCap.DeepCopy() pvCapCopy := pvCap.DeepCopy()
vmt.persistentVolumeSize = &pvCapCopy vmt.persistentVolumeSize = pvCapCopy
} }
} }
dsw.volumesToMount[volumeName] = vmt dsw.volumesToMount[volumeName] = vmt
@ -499,7 +499,7 @@ func (dsw *desiredStateOfWorld) DeletePodFromVolume(
// UpdatePersistentVolumeSize updates last known PV size. This is used for volume expansion and // UpdatePersistentVolumeSize updates last known PV size. This is used for volume expansion and
// should be only used for persistent volumes. // should be only used for persistent volumes.
func (dsw *desiredStateOfWorld) UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size *resource.Quantity) { func (dsw *desiredStateOfWorld) UpdatePersistentVolumeSize(volumeName v1.UniqueVolumeName, size resource.Quantity) {
dsw.Lock() dsw.Lock()
defer dsw.Unlock() defer dsw.Unlock()
@ -610,7 +610,7 @@ func (dsw *desiredStateOfWorld) GetVolumesToMount() []VolumeToMount {
SELinuxLabel: volumeObj.effectiveSELinuxMountFileLabel, SELinuxLabel: volumeObj.effectiveSELinuxMountFileLabel,
}, },
} }
if volumeObj.persistentVolumeSize != nil { if !volumeObj.persistentVolumeSize.IsZero() {
vmt.DesiredPersistentVolumeSize = volumeObj.persistentVolumeSize.DeepCopy() vmt.DesiredPersistentVolumeSize = volumeObj.persistentVolumeSize.DeepCopy()
} }
volumesToMount = append(volumesToMount, vmt) volumesToMount = append(volumesToMount, vmt)

View File

@ -384,8 +384,8 @@ func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize(
return return
} }
pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage() pvCap := volumeSpec.PersistentVolume.Spec.Capacity.Storage().DeepCopy()
pvcStatusCap := pvc.Status.Capacity.Storage() pvcStatusCap := pvc.Status.Capacity.Storage().DeepCopy()
dswp.desiredStateOfWorld.UpdatePersistentVolumeSize(uniqueVolumeName, pvCap) dswp.desiredStateOfWorld.UpdatePersistentVolumeSize(uniqueVolumeName, pvCap)
klog.V(5).InfoS("NodeExpandVolume updating size", "actualSize", pvcStatusCap.String(), "desiredSize", pvCap.String(), "volumeName", uniqueVolumeName) klog.V(5).InfoS("NodeExpandVolume updating size", "actualSize", pvcStatusCap.String(), "desiredSize", pvCap.String(), "volumeName", uniqueVolumeName)
// in case the actualStateOfWorld was rebuild after kubelet restart ensure that claimSize is set to accurate value // in case the actualStateOfWorld was rebuild after kubelet restart ensure that claimSize is set to accurate value

View File

@ -1374,7 +1374,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
t.Logf("Changing size of the volume to %s", tc.newPVSize.String()) t.Logf("Changing size of the volume to %s", tc.newPVSize.String())
newSize := tc.newPVSize.DeepCopy() newSize := tc.newPVSize.DeepCopy()
dsw.UpdatePersistentVolumeSize(volumeName, &newSize) dsw.UpdatePersistentVolumeSize(volumeName, newSize)
_, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, newSize, "" /* SELinuxLabel */) _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, newSize, "" /* SELinuxLabel */)
if tc.expansionFailed { if tc.expansionFailed {

View File

@ -266,7 +266,7 @@ func (f *fakeActualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts MarkVolumeOp
} }
// MarkVolumeAsResized implements ActualStateOfWorldMounterUpdater. // MarkVolumeAsResized implements ActualStateOfWorldMounterUpdater.
func (f *fakeActualStateOfWorld) MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool { func (f *fakeActualStateOfWorld) MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize resource.Quantity) bool {
panic("unimplemented") panic("unimplemented")
} }

View File

@ -198,7 +198,7 @@ type ActualStateOfWorldMounterUpdater interface {
MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
// Marks the specified volume's file system resize request is finished. // Marks the specified volume's file system resize request is finished.
MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize resource.Quantity) bool
// GetDeviceMountState returns mount state of the device in global path // GetDeviceMountState returns mount state of the device in global path
GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState
@ -277,9 +277,9 @@ type ActualStateOfWorldAttacherUpdater interface {
AddVolumeToReportAsAttached(logger klog.Logger, volumeName v1.UniqueVolumeName, nodeName types.NodeName) AddVolumeToReportAsAttached(logger klog.Logger, volumeName v1.UniqueVolumeName, nodeName types.NodeName)
// InitializeClaimSize sets pvc claim size by reading pvc.Status.Capacity // InitializeClaimSize sets pvc claim size by reading pvc.Status.Capacity
InitializeClaimSize(logger klog.Logger, volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) InitializeClaimSize(logger klog.Logger, volumeName v1.UniqueVolumeName, claimSize resource.Quantity)
GetClaimSize(volumeName v1.UniqueVolumeName) *resource.Quantity GetClaimSize(volumeName v1.UniqueVolumeName) resource.Quantity
} }
// VolumeLogger defines a set of operations for generating volume-related logging and error msgs // VolumeLogger defines a set of operations for generating volume-related logging and error msgs

View File

@ -1436,7 +1436,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
claimSize := actualStateOfWorld.GetClaimSize(volumeToMount.VolumeName) claimSize := actualStateOfWorld.GetClaimSize(volumeToMount.VolumeName)
// only fetch claimSize if it was not set previously // only fetch claimSize if it was not set previously
if volumeToMount.VolumeSpec.PersistentVolume != nil && claimSize == nil && !volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration { if volumeToMount.VolumeSpec.PersistentVolume != nil && claimSize.IsZero() && !volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration {
pv := volumeToMount.VolumeSpec.PersistentVolume pv := volumeToMount.VolumeSpec.PersistentVolume
pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{}) pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil { if err != nil {
@ -1445,7 +1445,7 @@ func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
} }
pvcStatusSize := pvc.Status.Capacity.Storage() pvcStatusSize := pvc.Status.Capacity.Storage()
if pvcStatusSize != nil { if pvcStatusSize != nil {
claimSize = pvcStatusSize claimSize = pvcStatusSize.DeepCopy()
} }
} }
@ -1964,7 +1964,7 @@ func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
return false, e1, e2 return false, e1, e2
} }
if resizeDone { if resizeDone {
markingDone := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.VolumeName, &newSize) markingDone := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.VolumeName, newSize)
if !markingDone { if !markingDone {
// On failure, return error. Caller will log and retry. // On failure, return error. Caller will log and retry.
genericFailureError := fmt.Errorf("unable to mark volume as resized") genericFailureError := fmt.Errorf("unable to mark volume as resized")