diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 7deb6df910d..c279b9f862a 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -196,9 +196,10 @@ func NewActualStateOfWorld( nodeName types.NodeName, volumePluginMgr *volume.VolumePluginMgr) ActualStateOfWorld { return &actualStateOfWorld{ - nodeName: nodeName, - attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume), - volumePluginMgr: volumePluginMgr, + nodeName: nodeName, + attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume), + foundDuringReconstruction: make(map[v1.UniqueVolumeName]map[volumetypes.UniquePodName]types.UID), + volumePluginMgr: volumePluginMgr, } } @@ -226,7 +227,8 @@ type actualStateOfWorld struct { // state by default. // The key in this map is the name of the volume and the value is an object // containing more information about the attached volume. - attachedVolumes map[v1.UniqueVolumeName]attachedVolume + attachedVolumes map[v1.UniqueVolumeName]attachedVolume + foundDuringReconstruction map[v1.UniqueVolumeName]map[volumetypes.UniquePodName]types.UID // volumePluginMgr is the volume plugin manager used to create volume // plugin objects. @@ -346,6 +348,40 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached( asw.DeleteVolume(volumeName) } +func (asw *actualStateOfWorld) AddVolumeViaReconstruction(opts operationexecutor.MarkVolumeOpts) error { + err := asw.MarkVolumeAsMounted(opts) + if err != nil { + return err + } + asw.Lock() + defer asw.Unlock() + + podMap, ok := asw.foundDuringReconstruction[opts.VolumeName] + if !ok { + podMap = map[volumetypes.UniquePodName]types.UID{} + } + podMap[opts.PodName] = opts.PodUID + asw.foundDuringReconstruction[opts.VolumeName] = podMap + return nil +} + +func (asw *actualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { + asw.RLock() + defer asw.RUnlock() + + volumeState := asw.GetVolumeMountState(volumeName, podName) + // only uncertain volumes are reconstructed + if volumeState != operationexecutor.VolumeMountUncertain { + return false + } + podMap, ok := asw.foundDuringReconstruction[volumeName] + if !ok { + return false + } + _, foundPod := podMap[podName] + return foundPod +} + func (asw *actualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts operationexecutor.MarkVolumeOpts) error { return asw.AddPodToVolume(markVolumeOpts) } @@ -641,6 +677,12 @@ func (asw *actualStateOfWorld) DeletePodFromVolume( delete(asw.attachedVolumes[volumeName].mountedPods, podName) } + // if there were reconstructed volumes, we should remove them + _, podExists = asw.foundDuringReconstruction[volumeName] + if podExists { + delete(asw.foundDuringReconstruction[volumeName], podName) + } + return nil } @@ -661,6 +703,7 @@ func (asw *actualStateOfWorld) DeleteVolume(volumeName v1.UniqueVolumeName) erro } delete(asw.attachedVolumes, volumeName) + delete(asw.foundDuringReconstruction, volumeName) return nil } @@ -739,7 +782,6 @@ func (asw *actualStateOfWorld) PodRemovedFromVolume( return false } } - return true } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 164b7cf5dff..a342d58ba9b 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -119,6 +119,7 @@ func NewReconciler( operationExecutor: operationExecutor, mounter: mounter, hostutil: hostutil, + skippedDuringReconstruction: map[v1.UniqueVolumeName]*globalVolumeInfo{}, volumePluginMgr: volumePluginMgr, kubeletPodsDir: kubeletPodsDir, timeOfLastSync: time.Time{}, @@ -138,6 +139,7 @@ type reconciler struct { mounter mount.Interface hostutil hostutil.HostUtils volumePluginMgr *volumepkg.VolumePluginMgr + skippedDuringReconstruction map[v1.UniqueVolumeName]*globalVolumeInfo kubeletPodsDir string timeOfLastSync time.Time } @@ -175,6 +177,10 @@ func (rc *reconciler) reconcile() { // Ensure devices that should be detached/unmounted are detached/unmounted. rc.unmountDetachDevices() + + if len(rc.skippedDuringReconstruction) > 0 { + rc.processReconstructedVolumes() + } } func (rc *reconciler) unmountVolumes() { @@ -249,6 +255,63 @@ func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, po } } +// processReconstructedVolumes checks volumes which were skipped during the reconstruction +// process because it was assumed that since these volumes were present in DSOW they would get +// mounted correctly and make it into ASOW. +// But if mount operation fails for some reason then we still need to mark the volume as uncertain +// and wait for the next reconciliation loop to deal with it. +func (rc *reconciler) processReconstructedVolumes() { + if rc.kubeClient != nil { + rc.updateDevicePath(rc.skippedDuringReconstruction) + } + for volumeName, glblVolumeInfo := range rc.skippedDuringReconstruction { + // check if volume is marked as attached to the node + // for now lets only process volumes which are at least known as attached to the node + // this should help with most volume types (including secret, configmap etc) + if !rc.actualStateOfWorld.VolumeExists(volumeName) { + klog.V(4).InfoS("Volume is not marked as attached to the node. Skipping processing of the volume", "volumeName", volumeName) + delete(rc.skippedDuringReconstruction, volumeName) + continue + } + uncertainVolumeCount := 0 + + for podName, volume := range glblVolumeInfo.podVolumes { + volumeNotMounted := rc.actualStateOfWorld.PodRemovedFromVolume(podName, volume.volumeName) + // if volume is not mounted then lets mark volume mounted in uncertain state in ASOW + if volumeNotMounted { + err := rc.markVolumeState(volume, operationexecutor.VolumeMountUncertain) + uncertainVolumeCount += 1 + if err != nil { + klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) + continue + } + klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) + } + } + + if uncertainVolumeCount > 0 { + // If the volume has device to mount, we mark its device as mounted. + if glblVolumeInfo.deviceMounter != nil || glblVolumeInfo.blockVolumeMapper != nil { + deviceMountPath, err := getDeviceMountPath(glblVolumeInfo) + if err != nil { + klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", glblVolumeInfo.volumeName) + continue + } + currentMountState := rc.actualStateOfWorld.GetDeviceMountState(glblVolumeInfo.volumeName) + if currentMountState == operationexecutor.DeviceNotMounted { + err = rc.actualStateOfWorld.MarkDeviceAsUncertain(glblVolumeInfo.volumeName, glblVolumeInfo.devicePath, deviceMountPath) + if err != nil { + klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", glblVolumeInfo.volumeName) + continue + } + klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", glblVolumeInfo.volumeName) + } + } + } + } + rc.skippedDuringReconstruction = make(map[v1.UniqueVolumeName]*globalVolumeInfo) +} + func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) { if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { //// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens @@ -418,21 +481,6 @@ func (rc *reconciler) syncStates(kubeletPodDir string) { rc.cleanupMounts(volume) continue } - if volumeInDSW { - // Some pod needs the volume. And it exists on disk. Some previous - // kubelet must have created the directory, therefore it must have - // reported the volume as in use. Mark the volume as in use also in - // this new kubelet so reconcile() calls SetUp and re-mounts the - // volume if it's necessary. - volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName) - klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) - continue - } - // There is no pod that uses the volume. - if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { - klog.InfoS("Volume is in pending operation, skip cleaning up mounts") - } - klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume) gvl := &globalVolumeInfo{ volumeName: reconstructedVolume.volumeName, volumeSpec: reconstructedVolume.volumeSpec, @@ -445,6 +493,22 @@ func (rc *reconciler) syncStates(kubeletPodDir string) { gvl = cachedInfo } gvl.addPodVolume(reconstructedVolume) + if volumeInDSW { + // Some pod needs the volume. And it exists on disk. Some previous + // kubelet must have created the directory, therefore it must have + // reported the volume as in use. Mark the volume as in use also in + // this new kubelet so reconcile() calls SetUp and re-mounts the + // volume if it's necessary. + volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName) + rc.skippedDuringReconstruction[reconstructedVolume.volumeName] = gvl + klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + continue + } + // There is no pod that uses the volume. + if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { + klog.InfoS("Volume is in pending operation, skip cleaning up mounts") + } + klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume) volumesNeedUpdate[reconstructedVolume.volumeName] = gvl } @@ -666,18 +730,7 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*gl continue } for _, volume := range gvl.podVolumes { - markVolumeOpts := operationexecutor.MarkVolumeOpts{ - PodName: volume.podName, - PodUID: types.UID(volume.podName), - VolumeName: volume.volumeName, - Mounter: volume.mounter, - BlockVolumeMapper: volume.blockVolumeMapper, - OuterVolumeSpecName: volume.outerVolumeSpecName, - VolumeGidVolume: volume.volumeGidValue, - VolumeSpec: volume.volumeSpec, - VolumeMountState: operationexecutor.VolumeMounted, - } - err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) + err = rc.markVolumeState(volume, operationexecutor.VolumeMounted) if err != nil { klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) continue @@ -702,6 +755,22 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*gl return nil } +func (rc *reconciler) markVolumeState(volume *reconstructedVolume, volumeState operationexecutor.VolumeMountState) error { + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: volume.podName, + PodUID: types.UID(volume.podName), + VolumeName: volume.volumeName, + Mounter: volume.mounter, + BlockVolumeMapper: volume.blockVolumeMapper, + OuterVolumeSpecName: volume.outerVolumeSpecName, + VolumeGidVolume: volume.volumeGidValue, + VolumeSpec: volume.volumeSpec, + VolumeMountState: operationexecutor.VolumeMounted, + } + err := rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) + return err +} + // getVolumesFromPodDir scans through the volumes directories under the given pod directory. // It returns a list of pod volume information including pod's uid, volume's plugin name, mount path, // and volume spec name. diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 3cbc4e61a57..4f0e8cb9e0a 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -215,6 +215,10 @@ type ActualStateOfWorldMounterUpdater interface { // MarkForInUseExpansionError marks the volume to have in-use error during expansion. // volume expansion must not be retried for this volume MarkForInUseExpansionError(volumeName v1.UniqueVolumeName) + + AddVolumeViaReconstruction(opts MarkVolumeOpts) error + + IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool } // ActualStateOfWorldAttacherUpdater defines a set of operations updating the diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 636aa4176d8..90404714b93 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -784,7 +784,8 @@ func (og *operationGenerator) markDeviceErrorState(volumeToMount VolumeToMount, func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, markOpts MarkVolumeOpts, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) { if volumetypes.IsOperationFinishedError(mountError) && - actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain { + actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain && + !actualStateOfWorld.IsVolumeReconstructed(volumeToMount.VolumeName, volumeToMount.PodName) { t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName) if t != nil { klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error()) @@ -799,7 +800,6 @@ func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", t).Error()) } } - } func (og *operationGenerator) GenerateUnmountVolumeFunc(