Fix code to process volumes which were skipped during reconstruction

This commit is contained in:
Hemant Kumar 2022-06-14 15:37:21 -04:00
parent c8b85fb470
commit eb071c2755
4 changed files with 149 additions and 34 deletions

View File

@ -198,6 +198,7 @@ func NewActualStateOfWorld(
return &actualStateOfWorld{ return &actualStateOfWorld{
nodeName: nodeName, nodeName: nodeName,
attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume), attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume),
foundDuringReconstruction: make(map[v1.UniqueVolumeName]map[volumetypes.UniquePodName]types.UID),
volumePluginMgr: volumePluginMgr, volumePluginMgr: volumePluginMgr,
} }
} }
@ -227,6 +228,7 @@ type actualStateOfWorld struct {
// The key in this map is the name of the volume and the value is an object // The key in this map is the name of the volume and the value is an object
// containing more information about the attached volume. // containing more information about the attached volume.
attachedVolumes map[v1.UniqueVolumeName]attachedVolume attachedVolumes map[v1.UniqueVolumeName]attachedVolume
foundDuringReconstruction map[v1.UniqueVolumeName]map[volumetypes.UniquePodName]types.UID
// volumePluginMgr is the volume plugin manager used to create volume // volumePluginMgr is the volume plugin manager used to create volume
// plugin objects. // plugin objects.
@ -346,6 +348,40 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached(
asw.DeleteVolume(volumeName) asw.DeleteVolume(volumeName)
} }
func (asw *actualStateOfWorld) AddVolumeViaReconstruction(opts operationexecutor.MarkVolumeOpts) error {
err := asw.MarkVolumeAsMounted(opts)
if err != nil {
return err
}
asw.Lock()
defer asw.Unlock()
podMap, ok := asw.foundDuringReconstruction[opts.VolumeName]
if !ok {
podMap = map[volumetypes.UniquePodName]types.UID{}
}
podMap[opts.PodName] = opts.PodUID
asw.foundDuringReconstruction[opts.VolumeName] = podMap
return nil
}
func (asw *actualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool {
asw.RLock()
defer asw.RUnlock()
volumeState := asw.GetVolumeMountState(volumeName, podName)
// only uncertain volumes are reconstructed
if volumeState != operationexecutor.VolumeMountUncertain {
return false
}
podMap, ok := asw.foundDuringReconstruction[volumeName]
if !ok {
return false
}
_, foundPod := podMap[podName]
return foundPod
}
func (asw *actualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts operationexecutor.MarkVolumeOpts) error { func (asw *actualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts operationexecutor.MarkVolumeOpts) error {
return asw.AddPodToVolume(markVolumeOpts) return asw.AddPodToVolume(markVolumeOpts)
} }
@ -641,6 +677,12 @@ func (asw *actualStateOfWorld) DeletePodFromVolume(
delete(asw.attachedVolumes[volumeName].mountedPods, podName) delete(asw.attachedVolumes[volumeName].mountedPods, podName)
} }
// if there were reconstructed volumes, we should remove them
_, podExists = asw.foundDuringReconstruction[volumeName]
if podExists {
delete(asw.foundDuringReconstruction[volumeName], podName)
}
return nil return nil
} }
@ -661,6 +703,7 @@ func (asw *actualStateOfWorld) DeleteVolume(volumeName v1.UniqueVolumeName) erro
} }
delete(asw.attachedVolumes, volumeName) delete(asw.attachedVolumes, volumeName)
delete(asw.foundDuringReconstruction, volumeName)
return nil return nil
} }
@ -739,7 +782,6 @@ func (asw *actualStateOfWorld) PodRemovedFromVolume(
return false return false
} }
} }
return true return true
} }

View File

@ -119,6 +119,7 @@ func NewReconciler(
operationExecutor: operationExecutor, operationExecutor: operationExecutor,
mounter: mounter, mounter: mounter,
hostutil: hostutil, hostutil: hostutil,
skippedDuringReconstruction: map[v1.UniqueVolumeName]*globalVolumeInfo{},
volumePluginMgr: volumePluginMgr, volumePluginMgr: volumePluginMgr,
kubeletPodsDir: kubeletPodsDir, kubeletPodsDir: kubeletPodsDir,
timeOfLastSync: time.Time{}, timeOfLastSync: time.Time{},
@ -138,6 +139,7 @@ type reconciler struct {
mounter mount.Interface mounter mount.Interface
hostutil hostutil.HostUtils hostutil hostutil.HostUtils
volumePluginMgr *volumepkg.VolumePluginMgr volumePluginMgr *volumepkg.VolumePluginMgr
skippedDuringReconstruction map[v1.UniqueVolumeName]*globalVolumeInfo
kubeletPodsDir string kubeletPodsDir string
timeOfLastSync time.Time timeOfLastSync time.Time
} }
@ -175,6 +177,10 @@ func (rc *reconciler) reconcile() {
// Ensure devices that should be detached/unmounted are detached/unmounted. // Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices() rc.unmountDetachDevices()
if len(rc.skippedDuringReconstruction) > 0 {
rc.processReconstructedVolumes()
}
} }
func (rc *reconciler) unmountVolumes() { func (rc *reconciler) unmountVolumes() {
@ -249,6 +255,63 @@ func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, po
} }
} }
// processReconstructedVolumes checks volumes which were skipped during the reconstruction
// process because it was assumed that since these volumes were present in DSOW they would get
// mounted correctly and make it into ASOW.
// But if mount operation fails for some reason then we still need to mark the volume as uncertain
// and wait for the next reconciliation loop to deal with it.
func (rc *reconciler) processReconstructedVolumes() {
if rc.kubeClient != nil {
rc.updateDevicePath(rc.skippedDuringReconstruction)
}
for volumeName, glblVolumeInfo := range rc.skippedDuringReconstruction {
// check if volume is marked as attached to the node
// for now lets only process volumes which are at least known as attached to the node
// this should help with most volume types (including secret, configmap etc)
if !rc.actualStateOfWorld.VolumeExists(volumeName) {
klog.V(4).InfoS("Volume is not marked as attached to the node. Skipping processing of the volume", "volumeName", volumeName)
delete(rc.skippedDuringReconstruction, volumeName)
continue
}
uncertainVolumeCount := 0
for podName, volume := range glblVolumeInfo.podVolumes {
volumeNotMounted := rc.actualStateOfWorld.PodRemovedFromVolume(podName, volume.volumeName)
// if volume is not mounted then lets mark volume mounted in uncertain state in ASOW
if volumeNotMounted {
err := rc.markVolumeState(volume, operationexecutor.VolumeMountUncertain)
uncertainVolumeCount += 1
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
}
}
if uncertainVolumeCount > 0 {
// If the volume has device to mount, we mark its device as mounted.
if glblVolumeInfo.deviceMounter != nil || glblVolumeInfo.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(glblVolumeInfo)
if err != nil {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", glblVolumeInfo.volumeName)
continue
}
currentMountState := rc.actualStateOfWorld.GetDeviceMountState(glblVolumeInfo.volumeName)
if currentMountState == operationexecutor.DeviceNotMounted {
err = rc.actualStateOfWorld.MarkDeviceAsUncertain(glblVolumeInfo.volumeName, glblVolumeInfo.devicePath, deviceMountPath)
if err != nil {
klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", glblVolumeInfo.volumeName)
continue
}
klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", glblVolumeInfo.volumeName)
}
}
}
}
rc.skippedDuringReconstruction = make(map[v1.UniqueVolumeName]*globalVolumeInfo)
}
func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) { func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) {
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
//// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens //// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens
@ -418,21 +481,6 @@ func (rc *reconciler) syncStates(kubeletPodDir string) {
rc.cleanupMounts(volume) rc.cleanupMounts(volume)
continue continue
} }
if volumeInDSW {
// Some pod needs the volume. And it exists on disk. Some previous
// kubelet must have created the directory, therefore it must have
// reported the volume as in use. Mark the volume as in use also in
// this new kubelet so reconcile() calls SetUp and re-mounts the
// volume if it's necessary.
volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// There is no pod that uses the volume.
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
klog.InfoS("Volume is in pending operation, skip cleaning up mounts")
}
klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume)
gvl := &globalVolumeInfo{ gvl := &globalVolumeInfo{
volumeName: reconstructedVolume.volumeName, volumeName: reconstructedVolume.volumeName,
volumeSpec: reconstructedVolume.volumeSpec, volumeSpec: reconstructedVolume.volumeSpec,
@ -445,6 +493,22 @@ func (rc *reconciler) syncStates(kubeletPodDir string) {
gvl = cachedInfo gvl = cachedInfo
} }
gvl.addPodVolume(reconstructedVolume) gvl.addPodVolume(reconstructedVolume)
if volumeInDSW {
// Some pod needs the volume. And it exists on disk. Some previous
// kubelet must have created the directory, therefore it must have
// reported the volume as in use. Mark the volume as in use also in
// this new kubelet so reconcile() calls SetUp and re-mounts the
// volume if it's necessary.
volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
rc.skippedDuringReconstruction[reconstructedVolume.volumeName] = gvl
klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// There is no pod that uses the volume.
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
klog.InfoS("Volume is in pending operation, skip cleaning up mounts")
}
klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = gvl volumesNeedUpdate[reconstructedVolume.volumeName] = gvl
} }
@ -666,18 +730,7 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*gl
continue continue
} }
for _, volume := range gvl.podVolumes { for _, volume := range gvl.podVolumes {
markVolumeOpts := operationexecutor.MarkVolumeOpts{ err = rc.markVolumeState(volume, operationexecutor.VolumeMounted)
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: operationexecutor.VolumeMounted,
}
err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
if err != nil { if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue continue
@ -702,6 +755,22 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*gl
return nil return nil
} }
func (rc *reconciler) markVolumeState(volume *reconstructedVolume, volumeState operationexecutor.VolumeMountState) error {
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: operationexecutor.VolumeMounted,
}
err := rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
return err
}
// getVolumesFromPodDir scans through the volumes directories under the given pod directory. // getVolumesFromPodDir scans through the volumes directories under the given pod directory.
// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path, // It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
// and volume spec name. // and volume spec name.

View File

@ -215,6 +215,10 @@ type ActualStateOfWorldMounterUpdater interface {
// MarkForInUseExpansionError marks the volume to have in-use error during expansion. // MarkForInUseExpansionError marks the volume to have in-use error during expansion.
// volume expansion must not be retried for this volume // volume expansion must not be retried for this volume
MarkForInUseExpansionError(volumeName v1.UniqueVolumeName) MarkForInUseExpansionError(volumeName v1.UniqueVolumeName)
AddVolumeViaReconstruction(opts MarkVolumeOpts) error
IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool
} }
// ActualStateOfWorldAttacherUpdater defines a set of operations updating the // ActualStateOfWorldAttacherUpdater defines a set of operations updating the

View File

@ -784,7 +784,8 @@ func (og *operationGenerator) markDeviceErrorState(volumeToMount VolumeToMount,
func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, markOpts MarkVolumeOpts, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) { func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, markOpts MarkVolumeOpts, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) {
if volumetypes.IsOperationFinishedError(mountError) && if volumetypes.IsOperationFinishedError(mountError) &&
actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain { actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain &&
!actualStateOfWorld.IsVolumeReconstructed(volumeToMount.VolumeName, volumeToMount.PodName) {
t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName) t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName)
if t != nil { if t != nil {
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error()) klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error())
@ -799,7 +800,6 @@ func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount,
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", t).Error()) klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", t).Error())
} }
} }
} }
func (og *operationGenerator) GenerateUnmountVolumeFunc( func (og *operationGenerator) GenerateUnmountVolumeFunc(