Move all volume reconstruction code into separate files

There is no code change, just moving code around and preparing for the
subsequent commit.
This commit is contained in:
Jan Safranek 2022-10-19 13:27:12 +02:00
parent 07315d10b3
commit 989e391d08
3 changed files with 505 additions and 456 deletions

View File

@ -20,35 +20,23 @@ limitations under the License.
package reconciler
import (
"context"
"fmt"
"io/fs"
"os"
"path"
"path/filepath"
"time"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
utilpath "k8s.io/utils/path"
utilstrings "k8s.io/utils/strings"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
volumepkg "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/mount-utils"
)
// Reconciler runs a periodic loop to reconcile the desired state of the world
@ -415,448 +403,6 @@ func (rc *reconciler) unmountDetachDevices() {
}
}
// sync process tries to observe the real world by scanning all pods' volume directories from the disk.
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
// the volumes and update the actual and desired states. For the volumes that cannot support reconstruction,
// it will try to clean up the mount paths with operation executor.
func (rc *reconciler) sync() {
defer rc.updateLastSyncTime()
rc.syncStates(rc.kubeletPodsDir)
}
func (rc *reconciler) updateLastSyncTime() {
rc.timeOfLastSync = time.Now()
}
func (rc *reconciler) StatesHasBeenSynced() bool {
return !rc.timeOfLastSync.IsZero()
}
type podVolume struct {
podName volumetypes.UniquePodName
volumeSpecName string
volumePath string
pluginName string
volumeMode v1.PersistentVolumeMode
}
type reconstructedVolume struct {
volumeName v1.UniqueVolumeName
podName volumetypes.UniquePodName
volumeSpec *volumepkg.Spec
outerVolumeSpecName string
pod *v1.Pod
volumeGidValue string
devicePath string
mounter volumepkg.Mounter
deviceMounter volumepkg.DeviceMounter
blockVolumeMapper volumepkg.BlockVolumeMapper
}
// globalVolumeInfo stores reconstructed volume information
// for each pod that was using that volume.
type globalVolumeInfo struct {
volumeName v1.UniqueVolumeName
volumeSpec *volumepkg.Spec
devicePath string
mounter volumepkg.Mounter
deviceMounter volumepkg.DeviceMounter
blockVolumeMapper volumepkg.BlockVolumeMapper
podVolumes map[volumetypes.UniquePodName]*reconstructedVolume
}
func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) {
if gvi.podVolumes == nil {
gvi.podVolumes = map[volumetypes.UniquePodName]*reconstructedVolume{}
}
gvi.podVolumes[rcv.podName] = rcv
}
// syncStates scans the volume directories under the given pod directory.
// If the volume is not in desired state of world, this function will reconstruct
// the volume related information and put it in both the actual and desired state of worlds.
// For some volume plugins that cannot support reconstruction, it will clean up the existing
// mount points since the volume is no long needed (removed from desired state)
func (rc *reconciler) syncStates(kubeletPodDir string) {
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(kubeletPodDir)
if err != nil {
klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
return
}
volumesNeedUpdate := make(map[v1.UniqueVolumeName]*globalVolumeInfo)
volumeNeedReport := []v1.UniqueVolumeName{}
for _, volume := range podVolumes {
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
// There is nothing to reconstruct
continue
}
volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
if volumeInDSW {
// Some pod needs the volume, don't clean it up and hope that
// reconcile() calls SetUp and reconstructs the volume in ASW.
klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// No pod needs the volume.
klog.InfoS("Could not construct volume information, cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)
rc.cleanupMounts(volume)
continue
}
gvl := &globalVolumeInfo{
volumeName: reconstructedVolume.volumeName,
volumeSpec: reconstructedVolume.volumeSpec,
devicePath: reconstructedVolume.devicePath,
deviceMounter: reconstructedVolume.deviceMounter,
blockVolumeMapper: reconstructedVolume.blockVolumeMapper,
mounter: reconstructedVolume.mounter,
}
if cachedInfo, ok := volumesNeedUpdate[reconstructedVolume.volumeName]; ok {
gvl = cachedInfo
}
gvl.addPodVolume(reconstructedVolume)
if volumeInDSW {
// Some pod needs the volume. And it exists on disk. Some previous
// kubelet must have created the directory, therefore it must have
// reported the volume as in use. Mark the volume as in use also in
// this new kubelet so reconcile() calls SetUp and re-mounts the
// volume if it's necessary.
volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
rc.skippedDuringReconstruction[reconstructedVolume.volumeName] = gvl
klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// There is no pod that uses the volume.
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
klog.InfoS("Volume is in pending operation, skip cleaning up mounts")
}
klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = gvl
}
if len(volumesNeedUpdate) > 0 {
if err = rc.updateStates(volumesNeedUpdate); err != nil {
klog.ErrorS(err, "Error occurred during reconstruct volume from disk")
}
}
if len(volumeNeedReport) > 0 {
rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport)
}
}
func (rc *reconciler) cleanupMounts(volume podVolume) {
klog.V(2).InfoS("Reconciler sync states: could not find volume information in desired state, clean up the mount points", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
mountedVolume := operationexecutor.MountedVolume{
PodName: volume.podName,
VolumeName: v1.UniqueVolumeName(volume.volumeSpecName),
InnerVolumeSpecName: volume.volumeSpecName,
PluginName: volume.pluginName,
PodUID: types.UID(volume.podName),
}
// TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add
// to unmount both volume and device in the same routine.
err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil {
klog.ErrorS(err, mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error())
return
}
}
// Reconstruct volume data structure by reading the pod's volume directories
func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, error) {
// plugin initializations
plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
if err != nil {
return nil, err
}
// Create pod object
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(volume.podName),
},
}
mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
if err != nil {
return nil, err
}
if volume.volumeMode == v1.PersistentVolumeBlock && mapperPlugin == nil {
return nil, fmt.Errorf("could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)", volume.pluginName, volume.volumeSpecName, volume.podName, pod.UID)
}
volumeSpec, err := rc.operationExecutor.ReconstructVolumeOperation(
volume.volumeMode,
plugin,
mapperPlugin,
pod.UID,
volume.podName,
volume.volumeSpecName,
volume.volumePath,
volume.pluginName)
if err != nil {
return nil, err
}
// We have to find the plugins by volume spec (NOT by plugin name) here
// in order to correctly reconstruct ephemeral volume types.
// Searching by spec checks whether the volume is actually attachable
// (i.e. has a PV) whereas searching by plugin name can only tell whether
// the plugin supports attachable volumes.
attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil {
return nil, err
}
deviceMountablePlugin, err := rc.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec)
if err != nil {
return nil, err
}
var uniqueVolumeName v1.UniqueVolumeName
if attachablePlugin != nil || deviceMountablePlugin != nil {
uniqueVolumeName, err = util.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
if err != nil {
return nil, err
}
} else {
uniqueVolumeName = util.GetUniqueVolumeNameFromSpecWithPod(volume.podName, plugin, volumeSpec)
}
var volumeMapper volumepkg.BlockVolumeMapper
var volumeMounter volumepkg.Mounter
var deviceMounter volumepkg.DeviceMounter
// Path to the mount or block device to check
var checkPath string
if volume.volumeMode == v1.PersistentVolumeBlock {
var newMapperErr error
volumeMapper, newMapperErr = mapperPlugin.NewBlockVolumeMapper(
volumeSpec,
pod,
volumepkg.VolumeOptions{})
if newMapperErr != nil {
return nil, fmt.Errorf(
"reconstructVolume.NewBlockVolumeMapper failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
newMapperErr)
}
mapDir, linkName := volumeMapper.GetPodDeviceMapPath()
checkPath = filepath.Join(mapDir, linkName)
} else {
var err error
volumeMounter, err = plugin.NewMounter(
volumeSpec,
pod,
volumepkg.VolumeOptions{})
if err != nil {
return nil, fmt.Errorf(
"reconstructVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
err)
}
checkPath = volumeMounter.GetPath()
if deviceMountablePlugin != nil {
deviceMounter, err = deviceMountablePlugin.NewDeviceMounter()
if err != nil {
return nil, fmt.Errorf("reconstructVolume.NewDeviceMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
err)
}
}
}
// Check existence of mount point for filesystem volume or symbolic link for block volume
isExist, checkErr := rc.operationExecutor.CheckVolumeExistenceOperation(volumeSpec, checkPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin)
if checkErr != nil {
return nil, checkErr
}
// If mount or symlink doesn't exist, volume reconstruction should be failed
if !isExist {
return nil, fmt.Errorf("volume: %q is not mounted", uniqueVolumeName)
}
reconstructedVolume := &reconstructedVolume{
volumeName: uniqueVolumeName,
podName: volume.podName,
volumeSpec: volumeSpec,
// volume.volumeSpecName is actually InnerVolumeSpecName. It will not be used
// for volume cleanup.
// in case pod is added back to desired state, outerVolumeSpecName will be updated from dsw information.
// See issue #103143 and its fix for details.
outerVolumeSpecName: volume.volumeSpecName,
pod: pod,
deviceMounter: deviceMounter,
volumeGidValue: "",
// devicePath is updated during updateStates() by checking node status's VolumesAttached data.
// TODO: get device path directly from the volume mount path.
devicePath: "",
mounter: volumeMounter,
blockVolumeMapper: volumeMapper,
}
return reconstructedVolume, nil
}
// updateDevicePath gets the node status to retrieve volume device path information.
func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) {
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
if fetchErr != nil {
klog.ErrorS(fetchErr, "UpdateStates in reconciler: could not get node status with error")
} else {
for _, attachedVolume := range node.Status.VolumesAttached {
if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists {
volume.devicePath = attachedVolume.DevicePath
volumesNeedUpdate[attachedVolume.Name] = volume
klog.V(4).InfoS("Update devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", volume.devicePath)
}
}
}
}
// getDeviceMountPath returns device mount path for block volume which
// implements BlockVolumeMapper or filesystem volume which implements
// DeviceMounter
func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) {
if gvi.blockVolumeMapper != nil {
// for block gvi, we return its global map path
return gvi.blockVolumeMapper.GetGlobalMapPath(gvi.volumeSpec)
} else if gvi.deviceMounter != nil {
// for filesystem gvi, we return its device mount path if the plugin implements DeviceMounter
return gvi.deviceMounter.GetDeviceMountPath(gvi.volumeSpec)
} else {
return "", fmt.Errorf("blockVolumeMapper or deviceMounter required")
}
}
func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) error {
// Get the node status to retrieve volume device path information.
// Skip reporting devicePath in node objects if kubeClient is nil.
// In standalone mode, kubelet is not expected to mount any attachable volume types or secret, configmaps etc.
if rc.kubeClient != nil {
rc.updateDevicePath(volumesNeedUpdate)
}
for _, gvl := range volumesNeedUpdate {
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
gvl.volumeName, gvl.volumeSpec, "" /* nodeName */, gvl.devicePath)
if err != nil {
klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName)
continue
}
for _, volume := range gvl.podVolumes {
err = rc.markVolumeState(volume, operationexecutor.VolumeMounted)
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
}
// If the volume has device to mount, we mark its device as mounted.
if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(gvl)
if err != nil {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName)
continue
}
// TODO(jsafrane): add reconstructed SELinux context
err = rc.actualStateOfWorld.MarkDeviceAsMounted(gvl.volumeName, gvl.devicePath, deviceMountPath, "")
if err != nil {
klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", gvl.volumeName)
continue
}
klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", gvl.volumeName)
}
}
return nil
}
func (rc *reconciler) markVolumeState(volume *reconstructedVolume, volumeState operationexecutor.VolumeMountState) error {
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: volumeState,
}
err := rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
return err
}
// getVolumesFromPodDir scans through the volumes directories under the given pod directory.
// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
// and volume spec name.
func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
podsDirInfo, err := os.ReadDir(podDir)
if err != nil {
return nil, err
}
volumes := []podVolume{}
for i := range podsDirInfo {
if !podsDirInfo[i].IsDir() {
continue
}
podName := podsDirInfo[i].Name()
podDir := path.Join(podDir, podName)
// Find filesystem volume information
// ex. filesystem volume: /pods/{podUid}/volume/{escapeQualifiedPluginName}/{volumeName}
volumesDirs := map[v1.PersistentVolumeMode]string{
v1.PersistentVolumeFilesystem: path.Join(podDir, config.DefaultKubeletVolumesDirName),
}
// Find block volume information
// ex. block volume: /pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName}
volumesDirs[v1.PersistentVolumeBlock] = path.Join(podDir, config.DefaultKubeletVolumeDevicesDirName)
for volumeMode, volumesDir := range volumesDirs {
var volumesDirInfo []fs.DirEntry
if volumesDirInfo, err = os.ReadDir(volumesDir); err != nil {
// Just skip the loop because given volumesDir doesn't exist depending on volumeMode
continue
}
for _, volumeDir := range volumesDirInfo {
pluginName := volumeDir.Name()
volumePluginPath := path.Join(volumesDir, pluginName)
volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath)
if err != nil {
klog.ErrorS(err, "Could not read volume plugin directory", "volumePluginPath", volumePluginPath)
continue
}
unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName)
for _, volumeName := range volumePluginDirs {
volumePath := path.Join(volumePluginPath, volumeName)
klog.V(5).InfoS("Volume path from volume plugin directory", "podName", podName, "volumePath", volumePath)
volumes = append(volumes, podVolume{
podName: volumetypes.UniquePodName(podName),
volumeSpecName: volumeName,
volumePath: volumePath,
pluginName: unescapePluginName,
volumeMode: volumeMode,
})
}
}
}
}
klog.V(4).InfoS("Get volumes from pod directory", "path", podDir, "volumes", volumes)
return volumes, nil
}
// ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
func isExpectedError(err error) bool {
return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err) || operationexecutor.IsMountFailedPreconditionError(err)

View File

@ -0,0 +1,316 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconciler
import (
"context"
"fmt"
"path/filepath"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
volumepkg "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
// sync process tries to observe the real world by scanning all pods' volume directories from the disk.
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
// the volumes and update the actual and desired states. For the volumes that cannot support reconstruction,
// it will try to clean up the mount paths with operation executor.
func (rc *reconciler) sync() {
defer rc.updateLastSyncTime()
rc.syncStates(rc.kubeletPodsDir)
}
// syncStates scans the volume directories under the given pod directory.
// If the volume is not in desired state of world, this function will reconstruct
// the volume related information and put it in both the actual and desired state of worlds.
// For some volume plugins that cannot support reconstruction, it will clean up the existing
// mount points since the volume is no long needed (removed from desired state)
func (rc *reconciler) syncStates(kubeletPodDir string) {
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(kubeletPodDir)
if err != nil {
klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
return
}
volumesNeedUpdate := make(map[v1.UniqueVolumeName]*globalVolumeInfo)
volumeNeedReport := []v1.UniqueVolumeName{}
for _, volume := range podVolumes {
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
// There is nothing to reconstruct
continue
}
volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
if volumeInDSW {
// Some pod needs the volume, don't clean it up and hope that
// reconcile() calls SetUp and reconstructs the volume in ASW.
klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// No pod needs the volume.
klog.InfoS("Could not construct volume information, cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)
rc.cleanupMounts(volume)
continue
}
gvl := &globalVolumeInfo{
volumeName: reconstructedVolume.volumeName,
volumeSpec: reconstructedVolume.volumeSpec,
devicePath: reconstructedVolume.devicePath,
deviceMounter: reconstructedVolume.deviceMounter,
blockVolumeMapper: reconstructedVolume.blockVolumeMapper,
mounter: reconstructedVolume.mounter,
}
if cachedInfo, ok := volumesNeedUpdate[reconstructedVolume.volumeName]; ok {
gvl = cachedInfo
}
gvl.addPodVolume(reconstructedVolume)
if volumeInDSW {
// Some pod needs the volume. And it exists on disk. Some previous
// kubelet must have created the directory, therefore it must have
// reported the volume as in use. Mark the volume as in use also in
// this new kubelet so reconcile() calls SetUp and re-mounts the
// volume if it's necessary.
volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
rc.skippedDuringReconstruction[reconstructedVolume.volumeName] = gvl
klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// There is no pod that uses the volume.
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
klog.InfoS("Volume is in pending operation, skip cleaning up mounts")
}
klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = gvl
}
if len(volumesNeedUpdate) > 0 {
if err = rc.updateStates(volumesNeedUpdate); err != nil {
klog.ErrorS(err, "Error occurred during reconstruct volume from disk")
}
}
if len(volumeNeedReport) > 0 {
rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport)
}
}
// Reconstruct volume data structure by reading the pod's volume directories
func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, error) {
// plugin initializations
plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
if err != nil {
return nil, err
}
// Create pod object
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(volume.podName),
},
}
mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
if err != nil {
return nil, err
}
if volume.volumeMode == v1.PersistentVolumeBlock && mapperPlugin == nil {
return nil, fmt.Errorf("could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)", volume.pluginName, volume.volumeSpecName, volume.podName, pod.UID)
}
volumeSpec, err := rc.operationExecutor.ReconstructVolumeOperation(
volume.volumeMode,
plugin,
mapperPlugin,
pod.UID,
volume.podName,
volume.volumeSpecName,
volume.volumePath,
volume.pluginName)
if err != nil {
return nil, err
}
// We have to find the plugins by volume spec (NOT by plugin name) here
// in order to correctly reconstruct ephemeral volume types.
// Searching by spec checks whether the volume is actually attachable
// (i.e. has a PV) whereas searching by plugin name can only tell whether
// the plugin supports attachable volumes.
attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil {
return nil, err
}
deviceMountablePlugin, err := rc.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec)
if err != nil {
return nil, err
}
var uniqueVolumeName v1.UniqueVolumeName
if attachablePlugin != nil || deviceMountablePlugin != nil {
uniqueVolumeName, err = util.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
if err != nil {
return nil, err
}
} else {
uniqueVolumeName = util.GetUniqueVolumeNameFromSpecWithPod(volume.podName, plugin, volumeSpec)
}
var volumeMapper volumepkg.BlockVolumeMapper
var volumeMounter volumepkg.Mounter
var deviceMounter volumepkg.DeviceMounter
// Path to the mount or block device to check
var checkPath string
if volume.volumeMode == v1.PersistentVolumeBlock {
var newMapperErr error
volumeMapper, newMapperErr = mapperPlugin.NewBlockVolumeMapper(
volumeSpec,
pod,
volumepkg.VolumeOptions{})
if newMapperErr != nil {
return nil, fmt.Errorf(
"reconstructVolume.NewBlockVolumeMapper failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
newMapperErr)
}
mapDir, linkName := volumeMapper.GetPodDeviceMapPath()
checkPath = filepath.Join(mapDir, linkName)
} else {
var err error
volumeMounter, err = plugin.NewMounter(
volumeSpec,
pod,
volumepkg.VolumeOptions{})
if err != nil {
return nil, fmt.Errorf(
"reconstructVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
err)
}
checkPath = volumeMounter.GetPath()
if deviceMountablePlugin != nil {
deviceMounter, err = deviceMountablePlugin.NewDeviceMounter()
if err != nil {
return nil, fmt.Errorf("reconstructVolume.NewDeviceMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
err)
}
}
}
// Check existence of mount point for filesystem volume or symbolic link for block volume
isExist, checkErr := rc.operationExecutor.CheckVolumeExistenceOperation(volumeSpec, checkPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin)
if checkErr != nil {
return nil, checkErr
}
// If mount or symlink doesn't exist, volume reconstruction should be failed
if !isExist {
return nil, fmt.Errorf("volume: %q is not mounted", uniqueVolumeName)
}
reconstructedVolume := &reconstructedVolume{
volumeName: uniqueVolumeName,
podName: volume.podName,
volumeSpec: volumeSpec,
// volume.volumeSpecName is actually InnerVolumeSpecName. It will not be used
// for volume cleanup.
// in case pod is added back to desired state, outerVolumeSpecName will be updated from dsw information.
// See issue #103143 and its fix for details.
outerVolumeSpecName: volume.volumeSpecName,
pod: pod,
deviceMounter: deviceMounter,
volumeGidValue: "",
// devicePath is updated during updateStates() by checking node status's VolumesAttached data.
// TODO: get device path directly from the volume mount path.
devicePath: "",
mounter: volumeMounter,
blockVolumeMapper: volumeMapper,
}
return reconstructedVolume, nil
}
// updateDevicePath gets the node status to retrieve volume device path information.
func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) {
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
if fetchErr != nil {
klog.ErrorS(fetchErr, "UpdateStates in reconciler: could not get node status with error")
} else {
for _, attachedVolume := range node.Status.VolumesAttached {
if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists {
volume.devicePath = attachedVolume.DevicePath
volumesNeedUpdate[attachedVolume.Name] = volume
klog.V(4).InfoS("Update devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", volume.devicePath)
}
}
}
}
func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) error {
// Get the node status to retrieve volume device path information.
// Skip reporting devicePath in node objects if kubeClient is nil.
// In standalone mode, kubelet is not expected to mount any attachable volume types or secret, configmaps etc.
if rc.kubeClient != nil {
rc.updateDevicePath(volumesNeedUpdate)
}
for _, gvl := range volumesNeedUpdate {
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
gvl.volumeName, gvl.volumeSpec, "" /* nodeName */, gvl.devicePath)
if err != nil {
klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName)
continue
}
for _, volume := range gvl.podVolumes {
err = rc.markVolumeState(volume, operationexecutor.VolumeMounted)
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
}
// If the volume has device to mount, we mark its device as mounted.
if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(gvl)
if err != nil {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName)
continue
}
// TODO(jsafrane): add reconstructed SELinux context
err = rc.actualStateOfWorld.MarkDeviceAsMounted(gvl.volumeName, gvl.devicePath, deviceMountPath, "")
if err != nil {
klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", gvl.volumeName)
continue
}
klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", gvl.volumeName)
}
}
return nil
}

View File

@ -0,0 +1,187 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconciler
import (
"fmt"
"io/fs"
"os"
"path"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/config"
volumepkg "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
utilpath "k8s.io/utils/path"
utilstrings "k8s.io/utils/strings"
)
type podVolume struct {
podName volumetypes.UniquePodName
volumeSpecName string
volumePath string
pluginName string
volumeMode v1.PersistentVolumeMode
}
type reconstructedVolume struct {
volumeName v1.UniqueVolumeName
podName volumetypes.UniquePodName
volumeSpec *volumepkg.Spec
outerVolumeSpecName string
pod *v1.Pod
volumeGidValue string
devicePath string
mounter volumepkg.Mounter
deviceMounter volumepkg.DeviceMounter
blockVolumeMapper volumepkg.BlockVolumeMapper
}
// globalVolumeInfo stores reconstructed volume information
// for each pod that was using that volume.
type globalVolumeInfo struct {
volumeName v1.UniqueVolumeName
volumeSpec *volumepkg.Spec
devicePath string
mounter volumepkg.Mounter
deviceMounter volumepkg.DeviceMounter
blockVolumeMapper volumepkg.BlockVolumeMapper
podVolumes map[volumetypes.UniquePodName]*reconstructedVolume
}
func (rc *reconciler) updateLastSyncTime() {
rc.timeOfLastSync = time.Now()
}
func (rc *reconciler) StatesHasBeenSynced() bool {
return !rc.timeOfLastSync.IsZero()
}
func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) {
if gvi.podVolumes == nil {
gvi.podVolumes = map[volumetypes.UniquePodName]*reconstructedVolume{}
}
gvi.podVolumes[rcv.podName] = rcv
}
func (rc *reconciler) cleanupMounts(volume podVolume) {
klog.V(2).InfoS("Reconciler sync states: could not find volume information in desired state, clean up the mount points", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
mountedVolume := operationexecutor.MountedVolume{
PodName: volume.podName,
VolumeName: v1.UniqueVolumeName(volume.volumeSpecName),
InnerVolumeSpecName: volume.volumeSpecName,
PluginName: volume.pluginName,
PodUID: types.UID(volume.podName),
}
// TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add
// to unmount both volume and device in the same routine.
err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil {
klog.ErrorS(err, mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error())
return
}
}
// getDeviceMountPath returns device mount path for block volume which
// implements BlockVolumeMapper or filesystem volume which implements
// DeviceMounter
func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) {
if gvi.blockVolumeMapper != nil {
// for block gvi, we return its global map path
return gvi.blockVolumeMapper.GetGlobalMapPath(gvi.volumeSpec)
} else if gvi.deviceMounter != nil {
// for filesystem gvi, we return its device mount path if the plugin implements DeviceMounter
return gvi.deviceMounter.GetDeviceMountPath(gvi.volumeSpec)
} else {
return "", fmt.Errorf("blockVolumeMapper or deviceMounter required")
}
}
func (rc *reconciler) markVolumeState(volume *reconstructedVolume, volumeState operationexecutor.VolumeMountState) error {
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: volumeState,
}
err := rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
return err
}
// getVolumesFromPodDir scans through the volumes directories under the given pod directory.
// It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
// and volume spec name.
func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
podsDirInfo, err := os.ReadDir(podDir)
if err != nil {
return nil, err
}
volumes := []podVolume{}
for i := range podsDirInfo {
if !podsDirInfo[i].IsDir() {
continue
}
podName := podsDirInfo[i].Name()
podDir := path.Join(podDir, podName)
// Find filesystem volume information
// ex. filesystem volume: /pods/{podUid}/volume/{escapeQualifiedPluginName}/{volumeName}
volumesDirs := map[v1.PersistentVolumeMode]string{
v1.PersistentVolumeFilesystem: path.Join(podDir, config.DefaultKubeletVolumesDirName),
}
// Find block volume information
// ex. block volume: /pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName}
volumesDirs[v1.PersistentVolumeBlock] = path.Join(podDir, config.DefaultKubeletVolumeDevicesDirName)
for volumeMode, volumesDir := range volumesDirs {
var volumesDirInfo []fs.DirEntry
if volumesDirInfo, err = os.ReadDir(volumesDir); err != nil {
// Just skip the loop because given volumesDir doesn't exist depending on volumeMode
continue
}
for _, volumeDir := range volumesDirInfo {
pluginName := volumeDir.Name()
volumePluginPath := path.Join(volumesDir, pluginName)
volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath)
if err != nil {
klog.ErrorS(err, "Could not read volume plugin directory", "volumePluginPath", volumePluginPath)
continue
}
unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName)
for _, volumeName := range volumePluginDirs {
volumePath := path.Join(volumePluginPath, volumeName)
klog.V(5).InfoS("Volume path from volume plugin directory", "podName", podName, "volumePath", volumePath)
volumes = append(volumes, podVolume{
podName: volumetypes.UniquePodName(podName),
volumeSpecName: volumeName,
volumePath: volumePath,
pluginName: unescapePluginName,
volumeMode: volumeMode,
})
}
}
}
}
klog.V(4).InfoS("Get volumes from pod directory", "path", podDir, "volumes", volumes)
return volumes, nil
}