diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index ecc1d23a603..da30ead630f 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -171,6 +171,12 @@ type ActualStateOfWorld interface { // SyncReconstructedVolume check the volume.outerVolumeSpecName in asw and // the one populated from dsw , if they do not match, update this field from the value from dsw. SyncReconstructedVolume(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, outerVolumeSpecName string) + + // UpdateReconstructedDevicePath updates devicePath of a reconstructed volume + // from Node.Status.VolumesAttached. The ASW is updated only when the volume is still + // uncertain. If the volume got mounted in the meantime, its devicePath must have + // been fixed by such an update. + UpdateReconstructedDevicePath(volumeName v1.UniqueVolumeName, devicePath string) } // MountedVolume represents a volume that has successfully been mounted to a pod. @@ -501,6 +507,24 @@ func (asw *actualStateOfWorld) MarkDeviceAsUnmounted( return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceNotMounted, "", "", "") } +func (asw *actualStateOfWorld) UpdateReconstructedDevicePath(volumeName v1.UniqueVolumeName, devicePath string) { + asw.Lock() + defer asw.Unlock() + + volumeObj, volumeExists := asw.attachedVolumes[volumeName] + if !volumeExists { + return + } + if volumeObj.deviceMountState != operationexecutor.DeviceMountUncertain { + // Reconciler must have updated volume state, i.e. when a pod uses the volume and + // succeeded mounting the volume. Such update has fixed the device path. + return + } + + volumeObj.devicePath = devicePath + asw.attachedVolumes[volumeName] = volumeObj +} + func (asw *actualStateOfWorld) GetDeviceMountState(volumeName v1.UniqueVolumeName) operationexecutor.DeviceMountState { asw.RLock() defer asw.RUnlock() @@ -636,7 +660,16 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M } podObj, podExists := volumeObj.mountedPods[podName] - if !podExists { + + updateUncertainVolume := false + if podExists { + // Update uncertain volumes - the new markVolumeOpts may have updated information. + // Especially reconstructed volumes (marked as uncertain during reconstruction) need + // an update. + updateUncertainVolume = utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) && podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain + } + if !podExists || updateUncertainVolume { + // Add new mountedPod or update existing one. podObj = mountedPod{ podName: podName, podUID: podUID, diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index 7b0c7536356..8e75dd28f32 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -153,7 +153,10 @@ func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, return done, nil }, stopCh) dswp.hasAddedPodsLock.Lock() - dswp.hasAddedPods = true + if !dswp.hasAddedPods { + klog.InfoS("Finished populating initial desired state of world") + dswp.hasAddedPods = true + } dswp.hasAddedPodsLock.Unlock() wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh) } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index a01d6b6151d..df47114c498 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -23,13 +23,14 @@ import ( "fmt" "time" - "k8s.io/apimachinery/pkg/api/resource" - v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" volumepkg "k8s.io/kubernetes/pkg/volume" @@ -121,6 +122,9 @@ func NewReconciler( volumePluginMgr: volumePluginMgr, kubeletPodsDir: kubeletPodsDir, timeOfLastSync: time.Time{}, + volumesFailedReconstruction: make([]podVolume, 0), + volumesNeedDevicePath: make([]v1.UniqueVolumeName, 0), + volumesNeedReportedInUse: make([]v1.UniqueVolumeName, 0), } } @@ -140,9 +144,17 @@ type reconciler struct { skippedDuringReconstruction map[v1.UniqueVolumeName]*globalVolumeInfo kubeletPodsDir string timeOfLastSync time.Time + volumesFailedReconstruction []podVolume + volumesNeedDevicePath []v1.UniqueVolumeName + volumesNeedReportedInUse []v1.UniqueVolumeName } func (rc *reconciler) Run(stopCh <-chan struct{}) { + if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) { + rc.runNew(stopCh) + return + } + wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh) } diff --git a/pkg/kubelet/volumemanager/reconciler/reconstruct_new.go b/pkg/kubelet/volumemanager/reconciler/reconstruct_new.go new file mode 100644 index 00000000000..11f776484c6 --- /dev/null +++ b/pkg/kubelet/volumemanager/reconciler/reconstruct_new.go @@ -0,0 +1,243 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconciler + +import ( + "context" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" +) + +// TODO: move to reconstruct.go and remove old code there. + +// TODO: Replace Run() when SELinuxMountReadWriteOncePod is GA +func (rc *reconciler) runNew(stopCh <-chan struct{}) { + rc.reconstructVolumes() + klog.InfoS("Reconciler: start to sync state") + wait.Until(rc.reconcileNew, rc.loopSleepDuration, stopCh) +} + +func (rc *reconciler) reconcileNew() { + readyToUnmount := rc.readyToUnmount() + if readyToUnmount { + // Unmounts are triggered before mounts so that a volume that was + // referenced by a pod that was deleted and is now referenced by another + // pod is unmounted from the first pod before being mounted to the new + // pod. + rc.unmountVolumes() + } + + // Next we mount required volumes. This function could also trigger + // attach if kubelet is responsible for attaching volumes. + // If underlying PVC was resized while in-use then this function also handles volume + // resizing. + rc.mountOrAttachVolumes() + + // Unmount volumes only when DSW and ASW are fully populated to prevent unmounting a volume + // that is still needed, but it did not reach DSW yet. + if readyToUnmount { + // Ensure devices that should be detached/unmounted are detached/unmounted. + rc.unmountDetachDevices() + + // Clean up any orphan volumes that failed reconstruction. + rc.cleanOrphanVolumes() + } + + if len(rc.volumesNeedDevicePath) != 0 { + rc.updateReconstructedDevicePaths() + } + + if len(rc.volumesNeedReportedInUse) != 0 && rc.populatorHasAddedPods() { + // Once DSW is populated, mark all reconstructed as reported in node.status, + // so they can proceed with MountDevice / SetUp. + rc.desiredStateOfWorld.MarkVolumesReportedInUse(rc.volumesNeedReportedInUse) + rc.volumesNeedReportedInUse = nil + } +} + +// readyToUnmount returns true when reconciler can start unmounting volumes. +func (rc *reconciler) readyToUnmount() bool { + // During kubelet startup, all volumes present on disk are added as uncertain to ASW. + // Allow unmount only when DSW is fully populated to prevent unmounting volumes that + // did not reach DSW yet. + if !rc.populatorHasAddedPods() { + return false + } + + // Allow unmount only when ASW device paths were corrected from node.status to prevent + // calling unmount with a wrong devicePath. + if len(rc.volumesNeedDevicePath) != 0 { + return false + } + return true +} + +// reconstructVolumes tries to reconstruct the actual state of world by scanning all pods' volume +// directories from the disk. For the volumes that cannot support or fail reconstruction, it will +// put the volumes to volumesFailedReconstruction to be cleaned up later when DesiredStateOfWorld +// is populated. +func (rc *reconciler) reconstructVolumes() { + defer rc.updateLastSyncTime() + // Get volumes information by reading the pod's directory + podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir) + if err != nil { + klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction") + return + } + reconstructedVolumes := make(map[v1.UniqueVolumeName]*globalVolumeInfo) + reconstructedVolumeNames := []v1.UniqueVolumeName{} + for _, volume := range podVolumes { + if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) { + klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + // There is nothing to reconstruct + continue + } + reconstructedVolume, err := rc.reconstructVolume(volume) + if err != nil { + klog.InfoS("Could not construct volume information", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err) + // We can't reconstruct the volume. Remember to check DSW after it's fully populated and force unmount the volume when it's orphaned. + rc.volumesFailedReconstruction = append(rc.volumesFailedReconstruction, volume) + continue + } + klog.V(4).InfoS("Adding reconstructed volume to actual state and node status", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + gvl := &globalVolumeInfo{ + volumeName: reconstructedVolume.volumeName, + volumeSpec: reconstructedVolume.volumeSpec, + devicePath: reconstructedVolume.devicePath, + deviceMounter: reconstructedVolume.deviceMounter, + blockVolumeMapper: reconstructedVolume.blockVolumeMapper, + mounter: reconstructedVolume.mounter, + } + if cachedInfo, ok := reconstructedVolumes[reconstructedVolume.volumeName]; ok { + gvl = cachedInfo + } + gvl.addPodVolume(reconstructedVolume) + + reconstructedVolumeNames = append(reconstructedVolumeNames, reconstructedVolume.volumeName) + reconstructedVolumes[reconstructedVolume.volumeName] = gvl + } + + if len(reconstructedVolumes) > 0 { + // Add the volumes to ASW + rc.updateStatesNew(reconstructedVolumes) + + // The reconstructed volumes are mounted, hence a previous kubelet must have already put it into node.status.volumesInUse. + // Remember to update DSW with this information. + rc.volumesNeedReportedInUse = reconstructedVolumeNames + // Remember to update devicePath from node.status.volumesAttached + rc.volumesNeedDevicePath = reconstructedVolumeNames + } + klog.V(2).InfoS("Volume reconstruction finished") +} + +func (rc *reconciler) updateStatesNew(reconstructedVolumes map[v1.UniqueVolumeName]*globalVolumeInfo) { + for _, gvl := range reconstructedVolumes { + err := rc.actualStateOfWorld.MarkVolumeAsAttached( + //TODO: the devicePath might not be correct for some volume plugins: see issue #54108 + gvl.volumeName, gvl.volumeSpec, "" /* nodeName */, gvl.devicePath) + if err != nil { + klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName) + continue + } + for _, volume := range gvl.podVolumes { + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: volume.podName, + PodUID: types.UID(volume.podName), + VolumeName: volume.volumeName, + Mounter: volume.mounter, + BlockVolumeMapper: volume.blockVolumeMapper, + OuterVolumeSpecName: volume.outerVolumeSpecName, + VolumeGidVolume: volume.volumeGidValue, + VolumeSpec: volume.volumeSpec, + VolumeMountState: operationexecutor.VolumeMountUncertain, + } + err = rc.actualStateOfWorld.MarkVolumeMountAsUncertain(markVolumeOpts) + if err != nil { + klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) + continue + } + klog.V(4).InfoS("Volume is marked as uncertain and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) + } + // If the volume has device to mount, we mark its device as mounted. + if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil { + deviceMountPath, err := getDeviceMountPath(gvl) + if err != nil { + klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName) + continue + } + err = rc.actualStateOfWorld.MarkDeviceAsUncertain(gvl.volumeName, gvl.devicePath, deviceMountPath, "") + if err != nil { + klog.ErrorS(err, "Could not mark device is uncertain to actual state of world", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath) + continue + } + klog.V(4).InfoS("Volume is marked device as uncertain and added into the actual state", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath) + } + } +} + +// cleanOrphanVolumes tries to clean up all volumes that failed reconstruction. +func (rc *reconciler) cleanOrphanVolumes() { + if len(rc.volumesFailedReconstruction) == 0 { + return + } + + for _, volume := range rc.volumesFailedReconstruction { + if rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) { + // Some pod needs the volume, don't clean it up and hope that + // reconcile() calls SetUp and reconstructs the volume in ASW. + klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + continue + } + klog.InfoS("Cleaning up mounts for volume that could not be reconstructed", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + rc.cleanupMounts(volume) + } + + klog.V(2).InfoS("Orphan volume cleanup finished") + // Clean the cache, cleanup is one shot operation. + rc.volumesFailedReconstruction = make([]podVolume, 0) +} + +// updateReconstructedDevicePaths tries to file devicePaths of reconstructed volumes from +// node.Status.VolumesAttached. This can be done only after connection to the API +// server is established, i.e. it can't be part of reconstructVolumes(). +func (rc *reconciler) updateReconstructedDevicePaths() { + klog.V(4).InfoS("Updating reconstructed devicePaths") + + node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{}) + if fetchErr != nil { + // This may repeat few times per second until kubelet is able to read its own status for the first time. + klog.ErrorS(fetchErr, "Failed to get Node status to reconstruct device paths") + return + } + + for _, volumeID := range rc.volumesNeedDevicePath { + for _, attachedVolume := range node.Status.VolumesAttached { + if volumeID != attachedVolume.Name { + continue + } + rc.actualStateOfWorld.UpdateReconstructedDevicePath(volumeID, attachedVolume.DevicePath) + klog.V(4).InfoS("Updated devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", attachedVolume.DevicePath) + } + } + klog.V(2).InfoS("DevicePaths of reconstructed volumes updated") + rc.volumesNeedDevicePath = nil +}