From 989e391d08f569ee5193c94662fa9ef24a0e70d2 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Wed, 19 Oct 2022 13:27:12 +0200 Subject: [PATCH] Move all volume reconstruction code into separate files There is no code change, just moving code around and preparing for the subsequent commit. --- .../volumemanager/reconciler/reconciler.go | 458 +----------------- .../volumemanager/reconciler/reconstruct.go | 316 ++++++++++++ .../reconciler/reconstruct_common.go | 187 +++++++ 3 files changed, 505 insertions(+), 456 deletions(-) create mode 100644 pkg/kubelet/volumemanager/reconciler/reconstruct.go create mode 100644 pkg/kubelet/volumemanager/reconciler/reconstruct_common.go diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index de178350158..a01d6b6151d 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -20,35 +20,23 @@ limitations under the License. package reconciler import ( - "context" "fmt" - "io/fs" - "os" - "path" - "path/filepath" "time" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/klog/v2" - "k8s.io/mount-utils" - utilpath "k8s.io/utils/path" - utilstrings "k8s.io/utils/strings" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/kubelet/config" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" volumepkg "k8s.io/kubernetes/pkg/volume" - "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util/hostutil" "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" - volumetypes "k8s.io/kubernetes/pkg/volume/util/types" + "k8s.io/mount-utils" ) // Reconciler runs a periodic loop to reconcile the desired state of the world @@ -415,448 +403,6 @@ func (rc *reconciler) unmountDetachDevices() { } } -// sync process tries to observe the real world by scanning all pods' volume directories from the disk. -// If the actual and desired state of worlds are not consistent with the observed world, it means that some -// mounted volumes are left out probably during kubelet restart. This process will reconstruct -// the volumes and update the actual and desired states. For the volumes that cannot support reconstruction, -// it will try to clean up the mount paths with operation executor. -func (rc *reconciler) sync() { - defer rc.updateLastSyncTime() - rc.syncStates(rc.kubeletPodsDir) -} - -func (rc *reconciler) updateLastSyncTime() { - rc.timeOfLastSync = time.Now() -} - -func (rc *reconciler) StatesHasBeenSynced() bool { - return !rc.timeOfLastSync.IsZero() -} - -type podVolume struct { - podName volumetypes.UniquePodName - volumeSpecName string - volumePath string - pluginName string - volumeMode v1.PersistentVolumeMode -} - -type reconstructedVolume struct { - volumeName v1.UniqueVolumeName - podName volumetypes.UniquePodName - volumeSpec *volumepkg.Spec - outerVolumeSpecName string - pod *v1.Pod - volumeGidValue string - devicePath string - mounter volumepkg.Mounter - deviceMounter volumepkg.DeviceMounter - blockVolumeMapper volumepkg.BlockVolumeMapper -} - -// globalVolumeInfo stores reconstructed volume information -// for each pod that was using that volume. -type globalVolumeInfo struct { - volumeName v1.UniqueVolumeName - volumeSpec *volumepkg.Spec - devicePath string - mounter volumepkg.Mounter - deviceMounter volumepkg.DeviceMounter - blockVolumeMapper volumepkg.BlockVolumeMapper - podVolumes map[volumetypes.UniquePodName]*reconstructedVolume -} - -func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) { - if gvi.podVolumes == nil { - gvi.podVolumes = map[volumetypes.UniquePodName]*reconstructedVolume{} - } - gvi.podVolumes[rcv.podName] = rcv -} - -// syncStates scans the volume directories under the given pod directory. -// If the volume is not in desired state of world, this function will reconstruct -// the volume related information and put it in both the actual and desired state of worlds. -// For some volume plugins that cannot support reconstruction, it will clean up the existing -// mount points since the volume is no long needed (removed from desired state) -func (rc *reconciler) syncStates(kubeletPodDir string) { - // Get volumes information by reading the pod's directory - podVolumes, err := getVolumesFromPodDir(kubeletPodDir) - if err != nil { - klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction") - return - } - volumesNeedUpdate := make(map[v1.UniqueVolumeName]*globalVolumeInfo) - volumeNeedReport := []v1.UniqueVolumeName{} - for _, volume := range podVolumes { - if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) { - klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) - // There is nothing to reconstruct - continue - } - volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) - - reconstructedVolume, err := rc.reconstructVolume(volume) - if err != nil { - if volumeInDSW { - // Some pod needs the volume, don't clean it up and hope that - // reconcile() calls SetUp and reconstructs the volume in ASW. - klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) - continue - } - // No pod needs the volume. - klog.InfoS("Could not construct volume information, cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err) - rc.cleanupMounts(volume) - continue - } - gvl := &globalVolumeInfo{ - volumeName: reconstructedVolume.volumeName, - volumeSpec: reconstructedVolume.volumeSpec, - devicePath: reconstructedVolume.devicePath, - deviceMounter: reconstructedVolume.deviceMounter, - blockVolumeMapper: reconstructedVolume.blockVolumeMapper, - mounter: reconstructedVolume.mounter, - } - if cachedInfo, ok := volumesNeedUpdate[reconstructedVolume.volumeName]; ok { - gvl = cachedInfo - } - gvl.addPodVolume(reconstructedVolume) - if volumeInDSW { - // Some pod needs the volume. And it exists on disk. Some previous - // kubelet must have created the directory, therefore it must have - // reported the volume as in use. Mark the volume as in use also in - // this new kubelet so reconcile() calls SetUp and re-mounts the - // volume if it's necessary. - volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName) - rc.skippedDuringReconstruction[reconstructedVolume.volumeName] = gvl - klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) - continue - } - // There is no pod that uses the volume. - if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { - klog.InfoS("Volume is in pending operation, skip cleaning up mounts") - } - klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume) - volumesNeedUpdate[reconstructedVolume.volumeName] = gvl - } - - if len(volumesNeedUpdate) > 0 { - if err = rc.updateStates(volumesNeedUpdate); err != nil { - klog.ErrorS(err, "Error occurred during reconstruct volume from disk") - } - } - if len(volumeNeedReport) > 0 { - rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport) - } -} - -func (rc *reconciler) cleanupMounts(volume podVolume) { - klog.V(2).InfoS("Reconciler sync states: could not find volume information in desired state, clean up the mount points", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) - mountedVolume := operationexecutor.MountedVolume{ - PodName: volume.podName, - VolumeName: v1.UniqueVolumeName(volume.volumeSpecName), - InnerVolumeSpecName: volume.volumeSpecName, - PluginName: volume.pluginName, - PodUID: types.UID(volume.podName), - } - // TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add - // to unmount both volume and device in the same routine. - err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir) - if err != nil { - klog.ErrorS(err, mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error()) - return - } -} - -// Reconstruct volume data structure by reading the pod's volume directories -func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, error) { - // plugin initializations - plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName) - if err != nil { - return nil, err - } - - // Create pod object - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - UID: types.UID(volume.podName), - }, - } - mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName) - if err != nil { - return nil, err - } - if volume.volumeMode == v1.PersistentVolumeBlock && mapperPlugin == nil { - return nil, fmt.Errorf("could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)", volume.pluginName, volume.volumeSpecName, volume.podName, pod.UID) - } - - volumeSpec, err := rc.operationExecutor.ReconstructVolumeOperation( - volume.volumeMode, - plugin, - mapperPlugin, - pod.UID, - volume.podName, - volume.volumeSpecName, - volume.volumePath, - volume.pluginName) - if err != nil { - return nil, err - } - - // We have to find the plugins by volume spec (NOT by plugin name) here - // in order to correctly reconstruct ephemeral volume types. - // Searching by spec checks whether the volume is actually attachable - // (i.e. has a PV) whereas searching by plugin name can only tell whether - // the plugin supports attachable volumes. - attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec) - if err != nil { - return nil, err - } - deviceMountablePlugin, err := rc.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec) - if err != nil { - return nil, err - } - - var uniqueVolumeName v1.UniqueVolumeName - if attachablePlugin != nil || deviceMountablePlugin != nil { - uniqueVolumeName, err = util.GetUniqueVolumeNameFromSpec(plugin, volumeSpec) - if err != nil { - return nil, err - } - } else { - uniqueVolumeName = util.GetUniqueVolumeNameFromSpecWithPod(volume.podName, plugin, volumeSpec) - } - - var volumeMapper volumepkg.BlockVolumeMapper - var volumeMounter volumepkg.Mounter - var deviceMounter volumepkg.DeviceMounter - // Path to the mount or block device to check - var checkPath string - - if volume.volumeMode == v1.PersistentVolumeBlock { - var newMapperErr error - volumeMapper, newMapperErr = mapperPlugin.NewBlockVolumeMapper( - volumeSpec, - pod, - volumepkg.VolumeOptions{}) - if newMapperErr != nil { - return nil, fmt.Errorf( - "reconstructVolume.NewBlockVolumeMapper failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", - uniqueVolumeName, - volumeSpec.Name(), - volume.podName, - pod.UID, - newMapperErr) - } - mapDir, linkName := volumeMapper.GetPodDeviceMapPath() - checkPath = filepath.Join(mapDir, linkName) - } else { - var err error - volumeMounter, err = plugin.NewMounter( - volumeSpec, - pod, - volumepkg.VolumeOptions{}) - if err != nil { - return nil, fmt.Errorf( - "reconstructVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", - uniqueVolumeName, - volumeSpec.Name(), - volume.podName, - pod.UID, - err) - } - checkPath = volumeMounter.GetPath() - if deviceMountablePlugin != nil { - deviceMounter, err = deviceMountablePlugin.NewDeviceMounter() - if err != nil { - return nil, fmt.Errorf("reconstructVolume.NewDeviceMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", - uniqueVolumeName, - volumeSpec.Name(), - volume.podName, - pod.UID, - err) - } - } - } - - // Check existence of mount point for filesystem volume or symbolic link for block volume - isExist, checkErr := rc.operationExecutor.CheckVolumeExistenceOperation(volumeSpec, checkPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin) - if checkErr != nil { - return nil, checkErr - } - // If mount or symlink doesn't exist, volume reconstruction should be failed - if !isExist { - return nil, fmt.Errorf("volume: %q is not mounted", uniqueVolumeName) - } - - reconstructedVolume := &reconstructedVolume{ - volumeName: uniqueVolumeName, - podName: volume.podName, - volumeSpec: volumeSpec, - // volume.volumeSpecName is actually InnerVolumeSpecName. It will not be used - // for volume cleanup. - // in case pod is added back to desired state, outerVolumeSpecName will be updated from dsw information. - // See issue #103143 and its fix for details. - outerVolumeSpecName: volume.volumeSpecName, - pod: pod, - deviceMounter: deviceMounter, - volumeGidValue: "", - // devicePath is updated during updateStates() by checking node status's VolumesAttached data. - // TODO: get device path directly from the volume mount path. - devicePath: "", - mounter: volumeMounter, - blockVolumeMapper: volumeMapper, - } - return reconstructedVolume, nil -} - -// updateDevicePath gets the node status to retrieve volume device path information. -func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) { - node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{}) - if fetchErr != nil { - klog.ErrorS(fetchErr, "UpdateStates in reconciler: could not get node status with error") - } else { - for _, attachedVolume := range node.Status.VolumesAttached { - if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists { - volume.devicePath = attachedVolume.DevicePath - volumesNeedUpdate[attachedVolume.Name] = volume - klog.V(4).InfoS("Update devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", volume.devicePath) - } - } - } -} - -// getDeviceMountPath returns device mount path for block volume which -// implements BlockVolumeMapper or filesystem volume which implements -// DeviceMounter -func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) { - if gvi.blockVolumeMapper != nil { - // for block gvi, we return its global map path - return gvi.blockVolumeMapper.GetGlobalMapPath(gvi.volumeSpec) - } else if gvi.deviceMounter != nil { - // for filesystem gvi, we return its device mount path if the plugin implements DeviceMounter - return gvi.deviceMounter.GetDeviceMountPath(gvi.volumeSpec) - } else { - return "", fmt.Errorf("blockVolumeMapper or deviceMounter required") - } -} - -func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) error { - // Get the node status to retrieve volume device path information. - // Skip reporting devicePath in node objects if kubeClient is nil. - // In standalone mode, kubelet is not expected to mount any attachable volume types or secret, configmaps etc. - if rc.kubeClient != nil { - rc.updateDevicePath(volumesNeedUpdate) - } - - for _, gvl := range volumesNeedUpdate { - err := rc.actualStateOfWorld.MarkVolumeAsAttached( - //TODO: the devicePath might not be correct for some volume plugins: see issue #54108 - gvl.volumeName, gvl.volumeSpec, "" /* nodeName */, gvl.devicePath) - if err != nil { - klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName) - continue - } - for _, volume := range gvl.podVolumes { - err = rc.markVolumeState(volume, operationexecutor.VolumeMounted) - if err != nil { - klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) - continue - } - klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) - } - // If the volume has device to mount, we mark its device as mounted. - if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil { - deviceMountPath, err := getDeviceMountPath(gvl) - if err != nil { - klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName) - continue - } - // TODO(jsafrane): add reconstructed SELinux context - err = rc.actualStateOfWorld.MarkDeviceAsMounted(gvl.volumeName, gvl.devicePath, deviceMountPath, "") - if err != nil { - klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", gvl.volumeName) - continue - } - klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", gvl.volumeName) - } - } - return nil -} - -func (rc *reconciler) markVolumeState(volume *reconstructedVolume, volumeState operationexecutor.VolumeMountState) error { - markVolumeOpts := operationexecutor.MarkVolumeOpts{ - PodName: volume.podName, - PodUID: types.UID(volume.podName), - VolumeName: volume.volumeName, - Mounter: volume.mounter, - BlockVolumeMapper: volume.blockVolumeMapper, - OuterVolumeSpecName: volume.outerVolumeSpecName, - VolumeGidVolume: volume.volumeGidValue, - VolumeSpec: volume.volumeSpec, - VolumeMountState: volumeState, - } - err := rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) - return err -} - -// getVolumesFromPodDir scans through the volumes directories under the given pod directory. -// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path, -// and volume spec name. -func getVolumesFromPodDir(podDir string) ([]podVolume, error) { - podsDirInfo, err := os.ReadDir(podDir) - if err != nil { - return nil, err - } - volumes := []podVolume{} - for i := range podsDirInfo { - if !podsDirInfo[i].IsDir() { - continue - } - podName := podsDirInfo[i].Name() - podDir := path.Join(podDir, podName) - - // Find filesystem volume information - // ex. filesystem volume: /pods/{podUid}/volume/{escapeQualifiedPluginName}/{volumeName} - volumesDirs := map[v1.PersistentVolumeMode]string{ - v1.PersistentVolumeFilesystem: path.Join(podDir, config.DefaultKubeletVolumesDirName), - } - // Find block volume information - // ex. block volume: /pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName} - volumesDirs[v1.PersistentVolumeBlock] = path.Join(podDir, config.DefaultKubeletVolumeDevicesDirName) - - for volumeMode, volumesDir := range volumesDirs { - var volumesDirInfo []fs.DirEntry - if volumesDirInfo, err = os.ReadDir(volumesDir); err != nil { - // Just skip the loop because given volumesDir doesn't exist depending on volumeMode - continue - } - for _, volumeDir := range volumesDirInfo { - pluginName := volumeDir.Name() - volumePluginPath := path.Join(volumesDir, pluginName) - volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath) - if err != nil { - klog.ErrorS(err, "Could not read volume plugin directory", "volumePluginPath", volumePluginPath) - continue - } - unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName) - for _, volumeName := range volumePluginDirs { - volumePath := path.Join(volumePluginPath, volumeName) - klog.V(5).InfoS("Volume path from volume plugin directory", "podName", podName, "volumePath", volumePath) - volumes = append(volumes, podVolume{ - podName: volumetypes.UniquePodName(podName), - volumeSpecName: volumeName, - volumePath: volumePath, - pluginName: unescapePluginName, - volumeMode: volumeMode, - }) - } - } - } - } - klog.V(4).InfoS("Get volumes from pod directory", "path", podDir, "volumes", volumes) - return volumes, nil -} - // ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. func isExpectedError(err error) bool { return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err) || operationexecutor.IsMountFailedPreconditionError(err) diff --git a/pkg/kubelet/volumemanager/reconciler/reconstruct.go b/pkg/kubelet/volumemanager/reconciler/reconstruct.go new file mode 100644 index 00000000000..ce503c5a824 --- /dev/null +++ b/pkg/kubelet/volumemanager/reconciler/reconstruct.go @@ -0,0 +1,316 @@ +/* +Copyright 2022 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconciler + +import ( + "context" + "fmt" + "path/filepath" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + volumepkg "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" +) + +// sync process tries to observe the real world by scanning all pods' volume directories from the disk. +// If the actual and desired state of worlds are not consistent with the observed world, it means that some +// mounted volumes are left out probably during kubelet restart. This process will reconstruct +// the volumes and update the actual and desired states. For the volumes that cannot support reconstruction, +// it will try to clean up the mount paths with operation executor. +func (rc *reconciler) sync() { + defer rc.updateLastSyncTime() + rc.syncStates(rc.kubeletPodsDir) +} + +// syncStates scans the volume directories under the given pod directory. +// If the volume is not in desired state of world, this function will reconstruct +// the volume related information and put it in both the actual and desired state of worlds. +// For some volume plugins that cannot support reconstruction, it will clean up the existing +// mount points since the volume is no long needed (removed from desired state) +func (rc *reconciler) syncStates(kubeletPodDir string) { + // Get volumes information by reading the pod's directory + podVolumes, err := getVolumesFromPodDir(kubeletPodDir) + if err != nil { + klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction") + return + } + volumesNeedUpdate := make(map[v1.UniqueVolumeName]*globalVolumeInfo) + volumeNeedReport := []v1.UniqueVolumeName{} + for _, volume := range podVolumes { + if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) { + klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + // There is nothing to reconstruct + continue + } + volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) + + reconstructedVolume, err := rc.reconstructVolume(volume) + if err != nil { + if volumeInDSW { + // Some pod needs the volume, don't clean it up and hope that + // reconcile() calls SetUp and reconstructs the volume in ASW. + klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + continue + } + // No pod needs the volume. + klog.InfoS("Could not construct volume information, cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err) + rc.cleanupMounts(volume) + continue + } + gvl := &globalVolumeInfo{ + volumeName: reconstructedVolume.volumeName, + volumeSpec: reconstructedVolume.volumeSpec, + devicePath: reconstructedVolume.devicePath, + deviceMounter: reconstructedVolume.deviceMounter, + blockVolumeMapper: reconstructedVolume.blockVolumeMapper, + mounter: reconstructedVolume.mounter, + } + if cachedInfo, ok := volumesNeedUpdate[reconstructedVolume.volumeName]; ok { + gvl = cachedInfo + } + gvl.addPodVolume(reconstructedVolume) + if volumeInDSW { + // Some pod needs the volume. And it exists on disk. Some previous + // kubelet must have created the directory, therefore it must have + // reported the volume as in use. Mark the volume as in use also in + // this new kubelet so reconcile() calls SetUp and re-mounts the + // volume if it's necessary. + volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName) + rc.skippedDuringReconstruction[reconstructedVolume.volumeName] = gvl + klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + continue + } + // There is no pod that uses the volume. + if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { + klog.InfoS("Volume is in pending operation, skip cleaning up mounts") + } + klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume) + volumesNeedUpdate[reconstructedVolume.volumeName] = gvl + } + + if len(volumesNeedUpdate) > 0 { + if err = rc.updateStates(volumesNeedUpdate); err != nil { + klog.ErrorS(err, "Error occurred during reconstruct volume from disk") + } + } + if len(volumeNeedReport) > 0 { + rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport) + } +} + +// Reconstruct volume data structure by reading the pod's volume directories +func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, error) { + // plugin initializations + plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName) + if err != nil { + return nil, err + } + + // Create pod object + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(volume.podName), + }, + } + mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName) + if err != nil { + return nil, err + } + if volume.volumeMode == v1.PersistentVolumeBlock && mapperPlugin == nil { + return nil, fmt.Errorf("could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)", volume.pluginName, volume.volumeSpecName, volume.podName, pod.UID) + } + + volumeSpec, err := rc.operationExecutor.ReconstructVolumeOperation( + volume.volumeMode, + plugin, + mapperPlugin, + pod.UID, + volume.podName, + volume.volumeSpecName, + volume.volumePath, + volume.pluginName) + if err != nil { + return nil, err + } + + // We have to find the plugins by volume spec (NOT by plugin name) here + // in order to correctly reconstruct ephemeral volume types. + // Searching by spec checks whether the volume is actually attachable + // (i.e. has a PV) whereas searching by plugin name can only tell whether + // the plugin supports attachable volumes. + attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec) + if err != nil { + return nil, err + } + deviceMountablePlugin, err := rc.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec) + if err != nil { + return nil, err + } + + var uniqueVolumeName v1.UniqueVolumeName + if attachablePlugin != nil || deviceMountablePlugin != nil { + uniqueVolumeName, err = util.GetUniqueVolumeNameFromSpec(plugin, volumeSpec) + if err != nil { + return nil, err + } + } else { + uniqueVolumeName = util.GetUniqueVolumeNameFromSpecWithPod(volume.podName, plugin, volumeSpec) + } + + var volumeMapper volumepkg.BlockVolumeMapper + var volumeMounter volumepkg.Mounter + var deviceMounter volumepkg.DeviceMounter + // Path to the mount or block device to check + var checkPath string + + if volume.volumeMode == v1.PersistentVolumeBlock { + var newMapperErr error + volumeMapper, newMapperErr = mapperPlugin.NewBlockVolumeMapper( + volumeSpec, + pod, + volumepkg.VolumeOptions{}) + if newMapperErr != nil { + return nil, fmt.Errorf( + "reconstructVolume.NewBlockVolumeMapper failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", + uniqueVolumeName, + volumeSpec.Name(), + volume.podName, + pod.UID, + newMapperErr) + } + mapDir, linkName := volumeMapper.GetPodDeviceMapPath() + checkPath = filepath.Join(mapDir, linkName) + } else { + var err error + volumeMounter, err = plugin.NewMounter( + volumeSpec, + pod, + volumepkg.VolumeOptions{}) + if err != nil { + return nil, fmt.Errorf( + "reconstructVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", + uniqueVolumeName, + volumeSpec.Name(), + volume.podName, + pod.UID, + err) + } + checkPath = volumeMounter.GetPath() + if deviceMountablePlugin != nil { + deviceMounter, err = deviceMountablePlugin.NewDeviceMounter() + if err != nil { + return nil, fmt.Errorf("reconstructVolume.NewDeviceMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", + uniqueVolumeName, + volumeSpec.Name(), + volume.podName, + pod.UID, + err) + } + } + } + + // Check existence of mount point for filesystem volume or symbolic link for block volume + isExist, checkErr := rc.operationExecutor.CheckVolumeExistenceOperation(volumeSpec, checkPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin) + if checkErr != nil { + return nil, checkErr + } + // If mount or symlink doesn't exist, volume reconstruction should be failed + if !isExist { + return nil, fmt.Errorf("volume: %q is not mounted", uniqueVolumeName) + } + + reconstructedVolume := &reconstructedVolume{ + volumeName: uniqueVolumeName, + podName: volume.podName, + volumeSpec: volumeSpec, + // volume.volumeSpecName is actually InnerVolumeSpecName. It will not be used + // for volume cleanup. + // in case pod is added back to desired state, outerVolumeSpecName will be updated from dsw information. + // See issue #103143 and its fix for details. + outerVolumeSpecName: volume.volumeSpecName, + pod: pod, + deviceMounter: deviceMounter, + volumeGidValue: "", + // devicePath is updated during updateStates() by checking node status's VolumesAttached data. + // TODO: get device path directly from the volume mount path. + devicePath: "", + mounter: volumeMounter, + blockVolumeMapper: volumeMapper, + } + return reconstructedVolume, nil +} + +// updateDevicePath gets the node status to retrieve volume device path information. +func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) { + node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{}) + if fetchErr != nil { + klog.ErrorS(fetchErr, "UpdateStates in reconciler: could not get node status with error") + } else { + for _, attachedVolume := range node.Status.VolumesAttached { + if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists { + volume.devicePath = attachedVolume.DevicePath + volumesNeedUpdate[attachedVolume.Name] = volume + klog.V(4).InfoS("Update devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", volume.devicePath) + } + } + } +} + +func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) error { + // Get the node status to retrieve volume device path information. + // Skip reporting devicePath in node objects if kubeClient is nil. + // In standalone mode, kubelet is not expected to mount any attachable volume types or secret, configmaps etc. + if rc.kubeClient != nil { + rc.updateDevicePath(volumesNeedUpdate) + } + + for _, gvl := range volumesNeedUpdate { + err := rc.actualStateOfWorld.MarkVolumeAsAttached( + //TODO: the devicePath might not be correct for some volume plugins: see issue #54108 + gvl.volumeName, gvl.volumeSpec, "" /* nodeName */, gvl.devicePath) + if err != nil { + klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName) + continue + } + for _, volume := range gvl.podVolumes { + err = rc.markVolumeState(volume, operationexecutor.VolumeMounted) + if err != nil { + klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) + continue + } + klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) + } + // If the volume has device to mount, we mark its device as mounted. + if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil { + deviceMountPath, err := getDeviceMountPath(gvl) + if err != nil { + klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName) + continue + } + // TODO(jsafrane): add reconstructed SELinux context + err = rc.actualStateOfWorld.MarkDeviceAsMounted(gvl.volumeName, gvl.devicePath, deviceMountPath, "") + if err != nil { + klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", gvl.volumeName) + continue + } + klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", gvl.volumeName) + } + } + return nil +} diff --git a/pkg/kubelet/volumemanager/reconciler/reconstruct_common.go b/pkg/kubelet/volumemanager/reconciler/reconstruct_common.go new file mode 100644 index 00000000000..6088fd5b0b6 --- /dev/null +++ b/pkg/kubelet/volumemanager/reconciler/reconstruct_common.go @@ -0,0 +1,187 @@ +/* +Copyright 2022 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconciler + +import ( + "fmt" + "io/fs" + "os" + "path" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/kubelet/config" + volumepkg "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" + utilpath "k8s.io/utils/path" + utilstrings "k8s.io/utils/strings" +) + +type podVolume struct { + podName volumetypes.UniquePodName + volumeSpecName string + volumePath string + pluginName string + volumeMode v1.PersistentVolumeMode +} + +type reconstructedVolume struct { + volumeName v1.UniqueVolumeName + podName volumetypes.UniquePodName + volumeSpec *volumepkg.Spec + outerVolumeSpecName string + pod *v1.Pod + volumeGidValue string + devicePath string + mounter volumepkg.Mounter + deviceMounter volumepkg.DeviceMounter + blockVolumeMapper volumepkg.BlockVolumeMapper +} + +// globalVolumeInfo stores reconstructed volume information +// for each pod that was using that volume. +type globalVolumeInfo struct { + volumeName v1.UniqueVolumeName + volumeSpec *volumepkg.Spec + devicePath string + mounter volumepkg.Mounter + deviceMounter volumepkg.DeviceMounter + blockVolumeMapper volumepkg.BlockVolumeMapper + podVolumes map[volumetypes.UniquePodName]*reconstructedVolume +} + +func (rc *reconciler) updateLastSyncTime() { + rc.timeOfLastSync = time.Now() +} + +func (rc *reconciler) StatesHasBeenSynced() bool { + return !rc.timeOfLastSync.IsZero() +} + +func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) { + if gvi.podVolumes == nil { + gvi.podVolumes = map[volumetypes.UniquePodName]*reconstructedVolume{} + } + gvi.podVolumes[rcv.podName] = rcv +} + +func (rc *reconciler) cleanupMounts(volume podVolume) { + klog.V(2).InfoS("Reconciler sync states: could not find volume information in desired state, clean up the mount points", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + mountedVolume := operationexecutor.MountedVolume{ + PodName: volume.podName, + VolumeName: v1.UniqueVolumeName(volume.volumeSpecName), + InnerVolumeSpecName: volume.volumeSpecName, + PluginName: volume.pluginName, + PodUID: types.UID(volume.podName), + } + // TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add + // to unmount both volume and device in the same routine. + err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir) + if err != nil { + klog.ErrorS(err, mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error()) + return + } +} + +// getDeviceMountPath returns device mount path for block volume which +// implements BlockVolumeMapper or filesystem volume which implements +// DeviceMounter +func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) { + if gvi.blockVolumeMapper != nil { + // for block gvi, we return its global map path + return gvi.blockVolumeMapper.GetGlobalMapPath(gvi.volumeSpec) + } else if gvi.deviceMounter != nil { + // for filesystem gvi, we return its device mount path if the plugin implements DeviceMounter + return gvi.deviceMounter.GetDeviceMountPath(gvi.volumeSpec) + } else { + return "", fmt.Errorf("blockVolumeMapper or deviceMounter required") + } +} + +func (rc *reconciler) markVolumeState(volume *reconstructedVolume, volumeState operationexecutor.VolumeMountState) error { + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: volume.podName, + PodUID: types.UID(volume.podName), + VolumeName: volume.volumeName, + Mounter: volume.mounter, + BlockVolumeMapper: volume.blockVolumeMapper, + OuterVolumeSpecName: volume.outerVolumeSpecName, + VolumeGidVolume: volume.volumeGidValue, + VolumeSpec: volume.volumeSpec, + VolumeMountState: volumeState, + } + err := rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) + return err +} + +// getVolumesFromPodDir scans through the volumes directories under the given pod directory. +// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path, +// and volume spec name. +func getVolumesFromPodDir(podDir string) ([]podVolume, error) { + podsDirInfo, err := os.ReadDir(podDir) + if err != nil { + return nil, err + } + volumes := []podVolume{} + for i := range podsDirInfo { + if !podsDirInfo[i].IsDir() { + continue + } + podName := podsDirInfo[i].Name() + podDir := path.Join(podDir, podName) + + // Find filesystem volume information + // ex. filesystem volume: /pods/{podUid}/volume/{escapeQualifiedPluginName}/{volumeName} + volumesDirs := map[v1.PersistentVolumeMode]string{ + v1.PersistentVolumeFilesystem: path.Join(podDir, config.DefaultKubeletVolumesDirName), + } + // Find block volume information + // ex. block volume: /pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName} + volumesDirs[v1.PersistentVolumeBlock] = path.Join(podDir, config.DefaultKubeletVolumeDevicesDirName) + + for volumeMode, volumesDir := range volumesDirs { + var volumesDirInfo []fs.DirEntry + if volumesDirInfo, err = os.ReadDir(volumesDir); err != nil { + // Just skip the loop because given volumesDir doesn't exist depending on volumeMode + continue + } + for _, volumeDir := range volumesDirInfo { + pluginName := volumeDir.Name() + volumePluginPath := path.Join(volumesDir, pluginName) + volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath) + if err != nil { + klog.ErrorS(err, "Could not read volume plugin directory", "volumePluginPath", volumePluginPath) + continue + } + unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName) + for _, volumeName := range volumePluginDirs { + volumePath := path.Join(volumePluginPath, volumeName) + klog.V(5).InfoS("Volume path from volume plugin directory", "podName", podName, "volumePath", volumePath) + volumes = append(volumes, podVolume{ + podName: volumetypes.UniquePodName(podName), + volumeSpecName: volumeName, + volumePath: volumePath, + pluginName: unescapePluginName, + volumeMode: volumeMode, + }) + } + } + } + } + klog.V(4).InfoS("Get volumes from pod directory", "path", podDir, "volumes", volumes) + return volumes, nil +}