mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Rework volume reconstruction
Subsequent SELinux work (see http://kep.k8s.io/1710) will need ActualStateOfWorld populated around the time kubelet starts mounting volumes. Therefore reconstruct volumes before starting reconciler, but do not depend on the desired state of world populated nor node.status - both need a working API server, which may not be available at that time. All reconstructed volumes are marked as Uncertain and reconciler will sort them out - call SetUp to ensure the volume is really mounted when a pod needs the volume or call TearDown then there is no such pod. Finish the reconstruction when the API server becomes available: - Clean up volumes that failed reconstruction and are not needed. - Update devicePath of reconstructed volumes from node.status. Make sure not to overwrite devicePath that may have been updated when the volume was mounted by reconcile(). Hiding all this rework behind SELinuxMountReadWriteOncePod FeatureGate, just to make sure we have a way back if this commit is buggy.
This commit is contained in:
parent
989e391d08
commit
e0f3e5c457
@ -171,6 +171,12 @@ type ActualStateOfWorld interface {
|
||||
// SyncReconstructedVolume check the volume.outerVolumeSpecName in asw and
|
||||
// the one populated from dsw , if they do not match, update this field from the value from dsw.
|
||||
SyncReconstructedVolume(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, outerVolumeSpecName string)
|
||||
|
||||
// UpdateReconstructedDevicePath updates devicePath of a reconstructed volume
|
||||
// from Node.Status.VolumesAttached. The ASW is updated only when the volume is still
|
||||
// uncertain. If the volume got mounted in the meantime, its devicePath must have
|
||||
// been fixed by such an update.
|
||||
UpdateReconstructedDevicePath(volumeName v1.UniqueVolumeName, devicePath string)
|
||||
}
|
||||
|
||||
// MountedVolume represents a volume that has successfully been mounted to a pod.
|
||||
@ -501,6 +507,24 @@ func (asw *actualStateOfWorld) MarkDeviceAsUnmounted(
|
||||
return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceNotMounted, "", "", "")
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) UpdateReconstructedDevicePath(volumeName v1.UniqueVolumeName, devicePath string) {
|
||||
asw.Lock()
|
||||
defer asw.Unlock()
|
||||
|
||||
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
|
||||
if !volumeExists {
|
||||
return
|
||||
}
|
||||
if volumeObj.deviceMountState != operationexecutor.DeviceMountUncertain {
|
||||
// Reconciler must have updated volume state, i.e. when a pod uses the volume and
|
||||
// succeeded mounting the volume. Such update has fixed the device path.
|
||||
return
|
||||
}
|
||||
|
||||
volumeObj.devicePath = devicePath
|
||||
asw.attachedVolumes[volumeName] = volumeObj
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) GetDeviceMountState(volumeName v1.UniqueVolumeName) operationexecutor.DeviceMountState {
|
||||
asw.RLock()
|
||||
defer asw.RUnlock()
|
||||
@ -636,7 +660,16 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M
|
||||
}
|
||||
|
||||
podObj, podExists := volumeObj.mountedPods[podName]
|
||||
if !podExists {
|
||||
|
||||
updateUncertainVolume := false
|
||||
if podExists {
|
||||
// Update uncertain volumes - the new markVolumeOpts may have updated information.
|
||||
// Especially reconstructed volumes (marked as uncertain during reconstruction) need
|
||||
// an update.
|
||||
updateUncertainVolume = utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) && podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain
|
||||
}
|
||||
if !podExists || updateUncertainVolume {
|
||||
// Add new mountedPod or update existing one.
|
||||
podObj = mountedPod{
|
||||
podName: podName,
|
||||
podUID: podUID,
|
||||
|
@ -153,7 +153,10 @@ func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady,
|
||||
return done, nil
|
||||
}, stopCh)
|
||||
dswp.hasAddedPodsLock.Lock()
|
||||
dswp.hasAddedPods = true
|
||||
if !dswp.hasAddedPods {
|
||||
klog.InfoS("Finished populating initial desired state of world")
|
||||
dswp.hasAddedPods = true
|
||||
}
|
||||
dswp.hasAddedPodsLock.Unlock()
|
||||
wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)
|
||||
}
|
||||
|
@ -23,13 +23,14 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
||||
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
||||
volumepkg "k8s.io/kubernetes/pkg/volume"
|
||||
@ -121,6 +122,9 @@ func NewReconciler(
|
||||
volumePluginMgr: volumePluginMgr,
|
||||
kubeletPodsDir: kubeletPodsDir,
|
||||
timeOfLastSync: time.Time{},
|
||||
volumesFailedReconstruction: make([]podVolume, 0),
|
||||
volumesNeedDevicePath: make([]v1.UniqueVolumeName, 0),
|
||||
volumesNeedReportedInUse: make([]v1.UniqueVolumeName, 0),
|
||||
}
|
||||
}
|
||||
|
||||
@ -140,9 +144,17 @@ type reconciler struct {
|
||||
skippedDuringReconstruction map[v1.UniqueVolumeName]*globalVolumeInfo
|
||||
kubeletPodsDir string
|
||||
timeOfLastSync time.Time
|
||||
volumesFailedReconstruction []podVolume
|
||||
volumesNeedDevicePath []v1.UniqueVolumeName
|
||||
volumesNeedReportedInUse []v1.UniqueVolumeName
|
||||
}
|
||||
|
||||
func (rc *reconciler) Run(stopCh <-chan struct{}) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) {
|
||||
rc.runNew(stopCh)
|
||||
return
|
||||
}
|
||||
|
||||
wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
|
||||
}
|
||||
|
||||
|
243
pkg/kubelet/volumemanager/reconciler/reconstruct_new.go
Normal file
243
pkg/kubelet/volumemanager/reconciler/reconstruct_new.go
Normal file
@ -0,0 +1,243 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package reconciler
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
)
|
||||
|
||||
// TODO: move to reconstruct.go and remove old code there.
|
||||
|
||||
// TODO: Replace Run() when SELinuxMountReadWriteOncePod is GA
|
||||
func (rc *reconciler) runNew(stopCh <-chan struct{}) {
|
||||
rc.reconstructVolumes()
|
||||
klog.InfoS("Reconciler: start to sync state")
|
||||
wait.Until(rc.reconcileNew, rc.loopSleepDuration, stopCh)
|
||||
}
|
||||
|
||||
func (rc *reconciler) reconcileNew() {
|
||||
readyToUnmount := rc.readyToUnmount()
|
||||
if readyToUnmount {
|
||||
// Unmounts are triggered before mounts so that a volume that was
|
||||
// referenced by a pod that was deleted and is now referenced by another
|
||||
// pod is unmounted from the first pod before being mounted to the new
|
||||
// pod.
|
||||
rc.unmountVolumes()
|
||||
}
|
||||
|
||||
// Next we mount required volumes. This function could also trigger
|
||||
// attach if kubelet is responsible for attaching volumes.
|
||||
// If underlying PVC was resized while in-use then this function also handles volume
|
||||
// resizing.
|
||||
rc.mountOrAttachVolumes()
|
||||
|
||||
// Unmount volumes only when DSW and ASW are fully populated to prevent unmounting a volume
|
||||
// that is still needed, but it did not reach DSW yet.
|
||||
if readyToUnmount {
|
||||
// Ensure devices that should be detached/unmounted are detached/unmounted.
|
||||
rc.unmountDetachDevices()
|
||||
|
||||
// Clean up any orphan volumes that failed reconstruction.
|
||||
rc.cleanOrphanVolumes()
|
||||
}
|
||||
|
||||
if len(rc.volumesNeedDevicePath) != 0 {
|
||||
rc.updateReconstructedDevicePaths()
|
||||
}
|
||||
|
||||
if len(rc.volumesNeedReportedInUse) != 0 && rc.populatorHasAddedPods() {
|
||||
// Once DSW is populated, mark all reconstructed as reported in node.status,
|
||||
// so they can proceed with MountDevice / SetUp.
|
||||
rc.desiredStateOfWorld.MarkVolumesReportedInUse(rc.volumesNeedReportedInUse)
|
||||
rc.volumesNeedReportedInUse = nil
|
||||
}
|
||||
}
|
||||
|
||||
// readyToUnmount returns true when reconciler can start unmounting volumes.
|
||||
func (rc *reconciler) readyToUnmount() bool {
|
||||
// During kubelet startup, all volumes present on disk are added as uncertain to ASW.
|
||||
// Allow unmount only when DSW is fully populated to prevent unmounting volumes that
|
||||
// did not reach DSW yet.
|
||||
if !rc.populatorHasAddedPods() {
|
||||
return false
|
||||
}
|
||||
|
||||
// Allow unmount only when ASW device paths were corrected from node.status to prevent
|
||||
// calling unmount with a wrong devicePath.
|
||||
if len(rc.volumesNeedDevicePath) != 0 {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// reconstructVolumes tries to reconstruct the actual state of world by scanning all pods' volume
|
||||
// directories from the disk. For the volumes that cannot support or fail reconstruction, it will
|
||||
// put the volumes to volumesFailedReconstruction to be cleaned up later when DesiredStateOfWorld
|
||||
// is populated.
|
||||
func (rc *reconciler) reconstructVolumes() {
|
||||
defer rc.updateLastSyncTime()
|
||||
// Get volumes information by reading the pod's directory
|
||||
podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
|
||||
return
|
||||
}
|
||||
reconstructedVolumes := make(map[v1.UniqueVolumeName]*globalVolumeInfo)
|
||||
reconstructedVolumeNames := []v1.UniqueVolumeName{}
|
||||
for _, volume := range podVolumes {
|
||||
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
|
||||
klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
|
||||
// There is nothing to reconstruct
|
||||
continue
|
||||
}
|
||||
reconstructedVolume, err := rc.reconstructVolume(volume)
|
||||
if err != nil {
|
||||
klog.InfoS("Could not construct volume information", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)
|
||||
// We can't reconstruct the volume. Remember to check DSW after it's fully populated and force unmount the volume when it's orphaned.
|
||||
rc.volumesFailedReconstruction = append(rc.volumesFailedReconstruction, volume)
|
||||
continue
|
||||
}
|
||||
klog.V(4).InfoS("Adding reconstructed volume to actual state and node status", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
|
||||
gvl := &globalVolumeInfo{
|
||||
volumeName: reconstructedVolume.volumeName,
|
||||
volumeSpec: reconstructedVolume.volumeSpec,
|
||||
devicePath: reconstructedVolume.devicePath,
|
||||
deviceMounter: reconstructedVolume.deviceMounter,
|
||||
blockVolumeMapper: reconstructedVolume.blockVolumeMapper,
|
||||
mounter: reconstructedVolume.mounter,
|
||||
}
|
||||
if cachedInfo, ok := reconstructedVolumes[reconstructedVolume.volumeName]; ok {
|
||||
gvl = cachedInfo
|
||||
}
|
||||
gvl.addPodVolume(reconstructedVolume)
|
||||
|
||||
reconstructedVolumeNames = append(reconstructedVolumeNames, reconstructedVolume.volumeName)
|
||||
reconstructedVolumes[reconstructedVolume.volumeName] = gvl
|
||||
}
|
||||
|
||||
if len(reconstructedVolumes) > 0 {
|
||||
// Add the volumes to ASW
|
||||
rc.updateStatesNew(reconstructedVolumes)
|
||||
|
||||
// The reconstructed volumes are mounted, hence a previous kubelet must have already put it into node.status.volumesInUse.
|
||||
// Remember to update DSW with this information.
|
||||
rc.volumesNeedReportedInUse = reconstructedVolumeNames
|
||||
// Remember to update devicePath from node.status.volumesAttached
|
||||
rc.volumesNeedDevicePath = reconstructedVolumeNames
|
||||
}
|
||||
klog.V(2).InfoS("Volume reconstruction finished")
|
||||
}
|
||||
|
||||
func (rc *reconciler) updateStatesNew(reconstructedVolumes map[v1.UniqueVolumeName]*globalVolumeInfo) {
|
||||
for _, gvl := range reconstructedVolumes {
|
||||
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
|
||||
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
|
||||
gvl.volumeName, gvl.volumeSpec, "" /* nodeName */, gvl.devicePath)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName)
|
||||
continue
|
||||
}
|
||||
for _, volume := range gvl.podVolumes {
|
||||
markVolumeOpts := operationexecutor.MarkVolumeOpts{
|
||||
PodName: volume.podName,
|
||||
PodUID: types.UID(volume.podName),
|
||||
VolumeName: volume.volumeName,
|
||||
Mounter: volume.mounter,
|
||||
BlockVolumeMapper: volume.blockVolumeMapper,
|
||||
OuterVolumeSpecName: volume.outerVolumeSpecName,
|
||||
VolumeGidVolume: volume.volumeGidValue,
|
||||
VolumeSpec: volume.volumeSpec,
|
||||
VolumeMountState: operationexecutor.VolumeMountUncertain,
|
||||
}
|
||||
err = rc.actualStateOfWorld.MarkVolumeMountAsUncertain(markVolumeOpts)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
|
||||
continue
|
||||
}
|
||||
klog.V(4).InfoS("Volume is marked as uncertain and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
|
||||
}
|
||||
// If the volume has device to mount, we mark its device as mounted.
|
||||
if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil {
|
||||
deviceMountPath, err := getDeviceMountPath(gvl)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName)
|
||||
continue
|
||||
}
|
||||
err = rc.actualStateOfWorld.MarkDeviceAsUncertain(gvl.volumeName, gvl.devicePath, deviceMountPath, "")
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Could not mark device is uncertain to actual state of world", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath)
|
||||
continue
|
||||
}
|
||||
klog.V(4).InfoS("Volume is marked device as uncertain and added into the actual state", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanOrphanVolumes tries to clean up all volumes that failed reconstruction.
|
||||
func (rc *reconciler) cleanOrphanVolumes() {
|
||||
if len(rc.volumesFailedReconstruction) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, volume := range rc.volumesFailedReconstruction {
|
||||
if rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
|
||||
// Some pod needs the volume, don't clean it up and hope that
|
||||
// reconcile() calls SetUp and reconstructs the volume in ASW.
|
||||
klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
|
||||
continue
|
||||
}
|
||||
klog.InfoS("Cleaning up mounts for volume that could not be reconstructed", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
|
||||
rc.cleanupMounts(volume)
|
||||
}
|
||||
|
||||
klog.V(2).InfoS("Orphan volume cleanup finished")
|
||||
// Clean the cache, cleanup is one shot operation.
|
||||
rc.volumesFailedReconstruction = make([]podVolume, 0)
|
||||
}
|
||||
|
||||
// updateReconstructedDevicePaths tries to file devicePaths of reconstructed volumes from
|
||||
// node.Status.VolumesAttached. This can be done only after connection to the API
|
||||
// server is established, i.e. it can't be part of reconstructVolumes().
|
||||
func (rc *reconciler) updateReconstructedDevicePaths() {
|
||||
klog.V(4).InfoS("Updating reconstructed devicePaths")
|
||||
|
||||
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
|
||||
if fetchErr != nil {
|
||||
// This may repeat few times per second until kubelet is able to read its own status for the first time.
|
||||
klog.ErrorS(fetchErr, "Failed to get Node status to reconstruct device paths")
|
||||
return
|
||||
}
|
||||
|
||||
for _, volumeID := range rc.volumesNeedDevicePath {
|
||||
for _, attachedVolume := range node.Status.VolumesAttached {
|
||||
if volumeID != attachedVolume.Name {
|
||||
continue
|
||||
}
|
||||
rc.actualStateOfWorld.UpdateReconstructedDevicePath(volumeID, attachedVolume.DevicePath)
|
||||
klog.V(4).InfoS("Updated devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", attachedVolume.DevicePath)
|
||||
}
|
||||
}
|
||||
klog.V(2).InfoS("DevicePaths of reconstructed volumes updated")
|
||||
rc.volumesNeedDevicePath = nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user