Merge pull request #123442 from jsafrane/reconstruction-ga

KEP 3756: Mark NewVolumeManagerReconstruction as GA
This commit is contained in:
Kubernetes Prow Robot 2024-02-22 08:14:55 -08:00 committed by GitHub
commit 58bbf69913
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 259 additions and 966 deletions

View File

@ -510,6 +510,7 @@ const (
// kep: https://kep.k8s.io/3756
// alpha: v1.25 (as part of SELinuxMountReadWriteOncePod)
// beta: v1.27
// GA: v1.30
// Robust VolumeManager reconstruction after kubelet restart.
NewVolumeManagerReconstruction featuregate.Feature = "NewVolumeManagerReconstruction"
@ -1055,7 +1056,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
MultiCIDRServiceAllocator: {Default: false, PreRelease: featuregate.Alpha},
NewVolumeManagerReconstruction: {Default: true, PreRelease: featuregate.Beta},
NewVolumeManagerReconstruction: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
NFTablesProxyMode: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -34,10 +34,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/component-helpers/storage/ephemeral"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
@ -324,7 +322,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
}
// Add volume to desired state of world
uniqueVolumeName, err := dswp.desiredStateOfWorld.AddPodToVolume(
_, err = dswp.desiredStateOfWorld.AddPodToVolume(
uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue, seLinuxContainerContexts[podVolume.Name])
if err != nil {
klog.ErrorS(err, "Failed to add volume to desiredStateOfWorld", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
@ -333,12 +331,6 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
} else {
klog.V(4).InfoS("Added volume to desired state", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
}
if !utilfeature.DefaultFeatureGate.Enabled(features.NewVolumeManagerReconstruction) {
// sync reconstructed volume. This is necessary only when the old-style reconstruction is still used.
// With reconstruct_new.go, AWS.MarkVolumeAsMounted will update the outer spec name of previously
// uncertain volumes.
dswp.actualStateOfWorld.SyncReconstructedVolume(uniqueVolumeName, uniquePodName, podVolume.Name)
}
dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, uniquePodName, mountedVolumesForPod)
}

View File

@ -89,78 +89,6 @@ func prepareDswpWithVolume(t *testing.T) (*desiredStateOfWorldPopulator, kubepod
return dswp, fakePodManager, fakePodStateProvider
}
func TestFindAndAddNewPods_WithRescontructedVolume(t *testing.T) {
// Outer volume spec replacement is needed only when the old volume reconstruction is used
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NewVolumeManagerReconstruction, false)()
// create dswp
dswp, fakePodManager, _ := prepareDswpWithVolume(t)
// create pod
fakeOuterVolumeName := "dswp-test-volume-name"
containers := []v1.Container{
{
VolumeMounts: []v1.VolumeMount{
{
Name: fakeOuterVolumeName,
MountPath: "/mnt",
},
},
},
}
pod := createPodWithVolume("dswp-test-pod", fakeOuterVolumeName, "file-bound", containers)
fakePodManager.AddPod(pod)
podName := util.GetUniquePodName(pod)
mode := v1.PersistentVolumeFilesystem
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: fakeOuterVolumeName,
},
Spec: v1.PersistentVolumeSpec{
ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "file-bound"},
VolumeMode: &mode,
},
}
generatedVolumeName := "fake-plugin/" + pod.Spec.Volumes[0].Name
uniqueVolumeName := v1.UniqueVolumeName(generatedVolumeName)
expectedOuterVolumeName := "dswp-test-volume-name"
opts := operationexecutor.MarkVolumeOpts{
PodName: podName,
PodUID: pod.UID,
VolumeName: uniqueVolumeName,
OuterVolumeSpecName: generatedVolumeName, // fake reconstructed volume
VolumeGidVolume: "",
VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false),
VolumeMountState: operationexecutor.VolumeMounted,
}
logger, _ := ktesting.NewTestContext(t)
dswp.actualStateOfWorld.MarkVolumeAsAttached(logger, opts.VolumeName, opts.VolumeSpec, "fake-node", "")
dswp.actualStateOfWorld.MarkVolumeAsMounted(opts)
dswp.findAndAddNewPods()
mountedVolumes := dswp.actualStateOfWorld.GetMountedVolumesForPod(podName)
found := false
for _, volume := range mountedVolumes {
if volume.OuterVolumeSpecName == expectedOuterVolumeName {
found = true
break
}
}
if dswp.hasAddedPods {
t.Fatalf("HasAddedPod should be false but it is true")
}
if !found {
t.Fatalf(
"Could not found pod volume %v in the list of actual state of world volumes to mount.",
expectedOuterVolumeName)
}
}
func TestFindAndAddNewPods_WithDifferentConditions(t *testing.T) {
tests := []struct {
desc string

View File

@ -1,5 +1,5 @@
/*
Copyright 2016 The Kubernetes Authors.
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,42 +14,28 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package reconciler implements interfaces that attempt to reconcile the
// desired state of the world with the actual state of the world by triggering
// relevant actions (attach, detach, mount, unmount).
package reconciler
import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
func (rc *reconciler) runOld(stopCh <-chan struct{}) {
wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
func (rc *reconciler) Run(stopCh <-chan struct{}) {
rc.reconstructVolumes()
klog.InfoS("Reconciler: start to sync state")
wait.Until(rc.reconcileNew, rc.loopSleepDuration, stopCh)
}
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
rc.reconcile()
// Sync the state with the reality once after all existing pods are added to the desired state from all sources.
// Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
// desired state of world does not contain a complete list of pods.
if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
klog.InfoS("Reconciler: start to sync state")
rc.sync()
}
func (rc *reconciler) reconcileNew() {
readyToUnmount := rc.readyToUnmount()
if readyToUnmount {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
rc.unmountVolumes()
}
}
func (rc *reconciler) reconcile() {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
rc.unmountVolumes()
// Next we mount required volumes. This function could also trigger
// attach if kubelet is responsible for attaching volumes.
@ -57,76 +43,30 @@ func (rc *reconciler) reconcile() {
// resizing.
rc.mountOrAttachVolumes()
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices()
// Unmount volumes only when DSW and ASW are fully populated to prevent unmounting a volume
// that is still needed, but it did not reach DSW yet.
if readyToUnmount {
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices()
// After running the above operations if skippedDuringReconstruction is not empty
// then ensure that all volumes which were discovered and skipped during reconstruction
// are added to actualStateOfWorld in uncertain state.
if len(rc.skippedDuringReconstruction) > 0 {
rc.processReconstructedVolumes()
}
}
// processReconstructedVolumes checks volumes which were skipped during the reconstruction
// process because it was assumed that since these volumes were present in DSOW they would get
// mounted correctly and make it into ASOW.
// But if mount operation fails for some reason then we still need to mark the volume as uncertain
// and wait for the next reconciliation loop to deal with it.
func (rc *reconciler) processReconstructedVolumes() {
for volumeName, glblVolumeInfo := range rc.skippedDuringReconstruction {
// check if volume is marked as attached to the node
// for now lets only process volumes which are at least known as attached to the node
// this should help with most volume types (including secret, configmap etc)
if !rc.actualStateOfWorld.VolumeExists(volumeName) {
klog.V(4).InfoS("Volume is not marked as attached to the node. Skipping processing of the volume", "volumeName", volumeName)
continue
}
uncertainVolumeCount := 0
// only delete volumes which were marked as attached here.
// This should ensure that - we will wait for volumes which were not marked as attached
// before adding them in uncertain state during reconstruction.
delete(rc.skippedDuringReconstruction, volumeName)
for podName, volume := range glblVolumeInfo.podVolumes {
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: operationexecutor.VolumeMountUncertain,
}
volumeAdded, err := rc.actualStateOfWorld.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts)
// if volume is not mounted then lets mark volume mounted in uncertain state in ASOW
if volumeAdded {
uncertainVolumeCount += 1
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
klog.V(4).InfoS("Volume is marked as mounted in uncertain state and added to the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
}
}
if uncertainVolumeCount > 0 {
// If the volume has device to mount, we mark its device as uncertain
if glblVolumeInfo.deviceMounter != nil || glblVolumeInfo.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(glblVolumeInfo)
if err != nil {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", glblVolumeInfo.volumeName)
continue
}
deviceMounted := rc.actualStateOfWorld.CheckAndMarkDeviceUncertainViaReconstruction(glblVolumeInfo.volumeName, deviceMountPath)
if !deviceMounted {
klog.V(3).InfoS("Could not mark device as mounted in uncertain state", "volumeName", glblVolumeInfo.volumeName)
}
}
}
// Clean up any orphan volumes that failed reconstruction.
rc.cleanOrphanVolumes()
}
if len(rc.volumesNeedUpdateFromNodeStatus) != 0 {
rc.updateReconstructedFromNodeStatus()
}
if len(rc.volumesNeedUpdateFromNodeStatus) == 0 {
// ASW is fully populated only after both devicePaths and uncertain volume attach-ability
// were reconstructed from the API server.
// This will start reconciliation of node.status.volumesInUse.
rc.updateLastSyncTime()
}
if len(rc.volumesNeedReportedInUse) != 0 && rc.populatorHasAddedPods() {
// Once DSW is populated, mark all reconstructed as reported in node.status,
// so they can proceed with MountDevice / SetUp.
rc.desiredStateOfWorld.MarkVolumesReportedInUse(rc.volumesNeedReportedInUse)
rc.volumesNeedReportedInUse = nil
}
}

View File

@ -24,10 +24,8 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
volumepkg "k8s.io/kubernetes/pkg/volume"
@ -148,15 +146,6 @@ type reconciler struct {
volumesNeedReportedInUse []v1.UniqueVolumeName
}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
if utilfeature.DefaultFeatureGate.Enabled(features.NewVolumeManagerReconstruction) {
rc.runNew(stopCh)
return
}
rc.runOld(stopCh)
}
func (rc *reconciler) unmountVolumes() {
// Ensure volumes that should be unmounted are unmounted.
for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {

View File

@ -1,75 +0,0 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconciler
import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)
// TODO: move to reconciler.go and remove old code there when NewVolumeManagerReconstruction is GA
// TODO: Replace Run() when NewVolumeManagerReconstruction is GA
func (rc *reconciler) runNew(stopCh <-chan struct{}) {
rc.reconstructVolumes()
klog.InfoS("Reconciler: start to sync state")
wait.Until(rc.reconcileNew, rc.loopSleepDuration, stopCh)
}
func (rc *reconciler) reconcileNew() {
readyToUnmount := rc.readyToUnmount()
if readyToUnmount {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
rc.unmountVolumes()
}
// Next we mount required volumes. This function could also trigger
// attach if kubelet is responsible for attaching volumes.
// If underlying PVC was resized while in-use then this function also handles volume
// resizing.
rc.mountOrAttachVolumes()
// Unmount volumes only when DSW and ASW are fully populated to prevent unmounting a volume
// that is still needed, but it did not reach DSW yet.
if readyToUnmount {
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices()
// Clean up any orphan volumes that failed reconstruction.
rc.cleanOrphanVolumes()
}
if len(rc.volumesNeedUpdateFromNodeStatus) != 0 {
rc.updateReconstructedFromNodeStatus()
}
if len(rc.volumesNeedUpdateFromNodeStatus) == 0 {
// ASW is fully populated only after both devicePaths and uncertain volume attach-ability
// were reconstructed from the API server.
// This will start reconciliation of node.status.volumesInUse.
rc.updateLastSyncTime()
}
if len(rc.volumesNeedReportedInUse) != 0 && rc.populatorHasAddedPods() {
// Once DSW is populated, mark all reconstructed as reported in node.status,
// so they can proceed with MountDevice / SetUp.
rc.desiredStateOfWorld.MarkVolumesReportedInUse(rc.volumesNeedReportedInUse)
rc.volumesNeedReportedInUse = nil
}
}

View File

@ -1,144 +0,0 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconciler
import (
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/volume"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/mount-utils"
)
func TestReconcileWithUpdateReconstructedFromAPIServer(t *testing.T) {
// Calls Run() with two reconstructed volumes.
// Verifies the devicePaths + volume attachability are reconstructed from node.status.
// Arrange
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: "fake-plugin/fake-device1",
DevicePath: "fake/path",
},
},
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
fakeHandler))
rc := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
mount.NewFakeMounter(nil),
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
kubeletPodsDir)
reconciler := rc.(*reconciler)
// The pod has two volumes, fake-device1 is attachable, fake-device2 is not.
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-name",
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "fake-device1",
},
},
},
{
Name: "volume-name2",
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "fake-device2",
},
},
},
},
},
}
volumeSpec1 := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
volumeName1 := util.GetUniqueVolumeName(fakePlugin.GetPluginName(), "fake-device1")
volumeSpec2 := &volume.Spec{Volume: &pod.Spec.Volumes[1]}
volumeName2 := util.GetUniqueVolumeName(fakePlugin.GetPluginName(), "fake-device2")
assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName1, volumeSpec1, nodeName, ""))
assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName1, "/dev/badly/reconstructed", "/var/lib/kubelet/plugins/global1", ""))
assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName2, volumeSpec2, nodeName, ""))
assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName2, "/dev/reconstructed", "/var/lib/kubelet/plugins/global2", ""))
assert.False(t, reconciler.StatesHasBeenSynced())
reconciler.volumesNeedUpdateFromNodeStatus = append(reconciler.volumesNeedUpdateFromNodeStatus, volumeName1, volumeName2)
// Act - run reconcile loop just once.
// "volumesNeedUpdateFromNodeStatus" is not empty, so no unmount will be triggered.
reconciler.reconcileNew()
// Assert
assert.True(t, reconciler.StatesHasBeenSynced())
assert.Empty(t, reconciler.volumesNeedUpdateFromNodeStatus)
attachedVolumes := asw.GetAttachedVolumes()
assert.Equalf(t, len(attachedVolumes), 2, "two volumes in ASW expected")
for _, vol := range attachedVolumes {
if vol.VolumeName == volumeName1 {
// devicePath + attachability must have been updated from node.status
assert.True(t, vol.PluginIsAttachable)
assert.Equal(t, vol.DevicePath, "fake/path")
}
if vol.VolumeName == volumeName2 {
// only attachability was updated from node.status
assert.False(t, vol.PluginIsAttachable)
assert.Equal(t, vol.DevicePath, "/dev/reconstructed")
}
}
}

View File

@ -19,7 +19,6 @@ package reconciler
import (
"crypto/md5"
"fmt"
"os"
"path/filepath"
"testing"
"time"
@ -40,7 +39,6 @@ import (
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/volume"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
@ -2295,249 +2293,113 @@ func getReconciler(kubeletDir string, t *testing.T, volumePaths []string, kubeCl
return rc, fakePlugin
}
func TestSyncStates(t *testing.T) {
type podInfo struct {
podName string
podUID string
outerVolumeName string
innerVolumeName string
}
defaultPodInfo := podInfo{
podName: "pod1",
podUID: "pod1uid",
outerVolumeName: "volume-name",
innerVolumeName: "volume-name",
}
tests := []struct {
name string
volumePaths []string
createMountPoint bool
podInfos []podInfo
postSyncStatCallback func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error
verifyFunc func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error
}{
{
name: "when two pods are using same volume and both are deleted",
volumePaths: []string{
filepath.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"),
filepath.Join("pod2", "volumes", "fake-plugin", "pvc-abcdef"),
},
createMountPoint: true,
podInfos: []podInfo{},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
if len(mountedPods) != 2 {
return fmt.Errorf("expected 2 pods to in asw got %d", len(mountedPods))
}
return nil
},
func TestReconcileWithUpdateReconstructedFromAPIServer(t *testing.T) {
// Calls Run() with two reconstructed volumes.
// Verifies the devicePaths + volume attachability are reconstructed from node.status.
// Arrange
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: string(nodeName),
},
{
name: "when two pods are using same volume and one of them is deleted",
volumePaths: []string{
filepath.Join("pod1uid", "volumes", "fake-plugin", "volume-name"),
filepath.Join("pod2uid", "volumes", "fake-plugin", "volume-name"),
},
createMountPoint: true,
podInfos: []podInfo{
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
podName: "pod2",
podUID: "pod2uid",
outerVolumeName: "volume-name",
innerVolumeName: "volume-name",
Name: "fake-plugin/fake-device1",
DevicePath: "fake/path",
},
},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
// for pod that is deleted, volume is considered as mounted
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
if len(mountedPods) != 1 {
return fmt.Errorf("expected 1 pods to in asw got %d", len(mountedPods))
}
if types.UniquePodName("pod1uid") != mountedPods[0].PodName {
return fmt.Errorf("expected mounted pod to be %s got %s", "pod1uid", mountedPods[0].PodName)
}
// for pod that is in dsw, volume is in skippedDuringReconstruction
skippedVolumes := rcInstance.skippedDuringReconstruction
if len(skippedVolumes) != 1 {
return fmt.Errorf("expected 1 pods to in skippedDuringReconstruction got %d", len(skippedVolumes))
}
if skippedVolumes["fake-plugin/volume-name"] == nil {
return fmt.Errorf("expected %s is in skippedDuringReconstruction, got %+v", "fake-plugin/volume-name", skippedVolumes)
}
return nil
},
},
{
name: "when reconstruction fails for a volume, volumes should be cleaned up",
volumePaths: []string{
filepath.Join("pod1", "volumes", "fake-plugin", volumetesting.FailNewMounter),
},
createMountPoint: true,
podInfos: []podInfo{},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
return retryWithExponentialBackOff(reconcilerSyncWaitDuration, func() (bool, error) {
err := volumetesting.VerifyTearDownCallCount(1, fakePlugin)
if err != nil {
return false, nil
}
return true, nil
})
},
}
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
fakeHandler))
rc := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
mount.NewFakeMounter(nil),
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
kubeletPodsDir)
reconciler := rc.(*reconciler)
// The pod has two volumes, fake-device1 is attachable, fake-device2 is not.
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
{
name: "when mount point does not exist, reconstruction should not fail, volumes should be added in asw",
volumePaths: []string{
filepath.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"),
},
createMountPoint: false,
podInfos: []podInfo{},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
if len(mountedPods) != 1 {
return fmt.Errorf("expected 1 pods to in asw got %d", len(mountedPods))
}
return nil
},
},
{
name: "when mount point does not exist, reconstruction should not fail, if volume exists in dsw, volume should be recorded in skipped during reconstruction",
volumePaths: []string{
filepath.Join("pod1uid", "volumes", "fake-plugin", "volume-name"),
},
createMountPoint: false,
podInfos: []podInfo{defaultPodInfo},
postSyncStatCallback: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
skippedVolumes := rcInstance.skippedDuringReconstruction
if len(skippedVolumes) != 1 {
return fmt.Errorf("expected 1 pods to in skippedDuringReconstruction got %d", len(skippedVolumes))
}
rcInstance.processReconstructedVolumes()
return nil
},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
mountedPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes()
if len(mountedPods) != 1 {
return fmt.Errorf("expected 1 pods to in mounted volume list got %d", len(mountedPods))
}
mountedPodVolume := mountedPods[0]
addedViaReconstruction := rcInstance.actualStateOfWorld.IsVolumeReconstructed(mountedPodVolume.VolumeName, mountedPodVolume.PodName)
if !addedViaReconstruction {
return fmt.Errorf("expected volume %s to be marked as added via reconstruction", mountedPodVolume.VolumeName)
}
// check device mount state
attachedVolumes := rcInstance.actualStateOfWorld.GetAttachedVolumes()
if len(attachedVolumes) != 1 {
return fmt.Errorf("expected 1 volume to be unmounted, got %d", len(attachedVolumes))
}
firstAttachedVolume := attachedVolumes[0]
if !firstAttachedVolume.DeviceMayBeMounted() {
return fmt.Errorf("expected %s volume to be mounted in uncertain state", firstAttachedVolume.VolumeName)
}
// also skippedVolumes map should be empty
skippedVolumes := rcInstance.skippedDuringReconstruction
if len(skippedVolumes) > 0 {
return fmt.Errorf("expected 0 pods in skipped volumes found %d", len(skippedVolumes))
}
return nil
},
},
{
name: "when volume exists in dsow, volume should be recorded in skipped during reconstruction",
volumePaths: []string{
filepath.Join("pod1uid", "volumes", "fake-plugin", "volume-name"),
},
createMountPoint: true,
podInfos: []podInfo{defaultPodInfo},
postSyncStatCallback: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
skippedVolumes := rcInstance.skippedDuringReconstruction
if len(skippedVolumes) != 1 {
return fmt.Errorf("expected 1 pods to in skippedDuringReconstruction got %d", len(skippedVolumes))
}
rcInstance.processReconstructedVolumes()
return nil
},
verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
mountedPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes()
if len(mountedPods) != 1 {
return fmt.Errorf("expected 1 pods to in mounted volume list got %d", len(mountedPods))
}
mountedPodVolume := mountedPods[0]
addedViaReconstruction := rcInstance.actualStateOfWorld.IsVolumeReconstructed(mountedPodVolume.VolumeName, mountedPodVolume.PodName)
if !addedViaReconstruction {
return fmt.Errorf("expected volume %s to be marked as added via reconstruction", mountedPodVolume.VolumeName)
}
// check device mount state
attachedVolumes := rcInstance.actualStateOfWorld.GetAttachedVolumes()
if len(attachedVolumes) != 1 {
return fmt.Errorf("expected 1 volume to be unmounted, got %d", len(attachedVolumes))
}
firstAttachedVolume := attachedVolumes[0]
if !firstAttachedVolume.DeviceMayBeMounted() {
return fmt.Errorf("expected %s volume to be mounted in uncertain state", firstAttachedVolume.VolumeName)
}
// also skippedVolumes map should be empty
skippedVolumes := rcInstance.skippedDuringReconstruction
if len(skippedVolumes) > 0 {
return fmt.Errorf("expected 0 pods in skipped volumes found %d", len(skippedVolumes))
}
return nil
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-name",
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "fake-device1",
},
},
},
{
Name: "volume-name2",
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: "fake-device2",
},
},
},
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tmpKubeletDir, err := os.MkdirTemp("", "")
if err != nil {
t.Fatalf("can't make a temp directory for kubeletPods: %v", err)
}
defer os.RemoveAll(tmpKubeletDir)
// create kubelet pod directory
tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
os.MkdirAll(tmpKubeletPodDir, 0755)
volumeSpec1 := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
volumeName1 := util.GetUniqueVolumeName(fakePlugin.GetPluginName(), "fake-device1")
volumeSpec2 := &volume.Spec{Volume: &pod.Spec.Volumes[1]}
volumeName2 := util.GetUniqueVolumeName(fakePlugin.GetPluginName(), "fake-device2")
mountPaths := []string{}
assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName1, volumeSpec1, nodeName, ""))
assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName1, "/dev/badly/reconstructed", "/var/lib/kubelet/plugins/global1", ""))
assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName2, volumeSpec2, nodeName, ""))
assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName2, "/dev/reconstructed", "/var/lib/kubelet/plugins/global2", ""))
// create pod and volume directories so as reconciler can find them.
for _, volumePath := range tc.volumePaths {
vp := filepath.Join(tmpKubeletPodDir, volumePath)
if tc.createMountPoint {
mountPaths = append(mountPaths, vp)
}
os.MkdirAll(vp, 0755)
}
assert.False(t, reconciler.StatesHasBeenSynced())
rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths, nil /*custom kubeclient*/)
rcInstance, _ := rc.(*reconciler)
logger, _ := ktesting.NewTestContext(t)
for _, tpodInfo := range tc.podInfos {
pod := getInlineFakePod(tpodInfo.podName, tpodInfo.podUID, tpodInfo.outerVolumeName, tpodInfo.innerVolumeName)
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
podName := util.GetUniquePodName(pod)
volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume(
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* SELinuxContext */)
if err != nil {
t.Fatalf("error adding volume %s to dsow: %v", volumeSpec.Name(), err)
}
rcInstance.actualStateOfWorld.MarkVolumeAsAttached(logger, volumeName, volumeSpec, nodeName, "")
}
reconciler.volumesNeedUpdateFromNodeStatus = append(reconciler.volumesNeedUpdateFromNodeStatus, volumeName1, volumeName2)
// Act - run reconcile loop just once.
// "volumesNeedUpdateFromNodeStatus" is not empty, so no unmount will be triggered.
reconciler.reconcileNew()
rcInstance.syncStates(tmpKubeletPodDir)
if tc.postSyncStatCallback != nil {
err := tc.postSyncStatCallback(rcInstance, fakePlugin)
if err != nil {
t.Errorf("test %s, postSyncStatCallback failed: %v", tc.name, err)
}
}
// Assert
assert.True(t, reconciler.StatesHasBeenSynced())
assert.Empty(t, reconciler.volumesNeedUpdateFromNodeStatus)
if err := tc.verifyFunc(rcInstance, fakePlugin); err != nil {
t.Errorf("test %s failed: %v", tc.name, err)
}
})
attachedVolumes := asw.GetAttachedVolumes()
assert.Equalf(t, len(attachedVolumes), 2, "two volumes in ASW expected")
for _, vol := range attachedVolumes {
if vol.VolumeName == volumeName1 {
// devicePath + attachability must have been updated from node.status
assert.True(t, vol.PluginIsAttachable)
assert.Equal(t, vol.DevicePath, "fake/path")
}
if vol.VolumeName == volumeName2 {
// only attachability was updated from node.status
assert.False(t, vol.PluginIsAttachable)
assert.Equal(t, vol.DevicePath, "/dev/reconstructed")
}
}
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2016 The Kubernetes Authors.
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -23,55 +23,55 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
// sync process tries to observe the real world by scanning all pods' volume directories from the disk.
// If the actual and desired state of worlds are not consistent with the observed world, it means that some
// mounted volumes are left out probably during kubelet restart. This process will reconstruct
// the volumes and update the actual and desired states. For the volumes that cannot support reconstruction,
// it will try to clean up the mount paths with operation executor.
func (rc *reconciler) sync() {
defer rc.updateLastSyncTime()
rc.syncStates(rc.kubeletPodsDir)
// TODO: move to reconstruct.go and remove old code there.
// readyToUnmount returns true when reconciler can start unmounting volumes.
func (rc *reconciler) readyToUnmount() bool {
// During kubelet startup, all volumes present on disk are added as uncertain to ASW.
// Allow unmount only when DSW is fully populated to prevent unmounting volumes that
// did not reach DSW yet.
if !rc.populatorHasAddedPods() {
return false
}
// Allow unmount only when ASW device paths were corrected from node.status to prevent
// calling unmount with a wrong devicePath.
if len(rc.volumesNeedUpdateFromNodeStatus) != 0 {
return false
}
return true
}
// syncStates scans the volume directories under the given pod directory.
// If the volume is not in desired state of world, this function will reconstruct
// the volume related information and put it in both the actual and desired state of worlds.
// For some volume plugins that cannot support reconstruction, it will clean up the existing
// mount points since the volume is no long needed (removed from desired state)
func (rc *reconciler) syncStates(kubeletPodDir string) {
// reconstructVolumes tries to reconstruct the actual state of world by scanning all pods' volume
// directories from the disk. For the volumes that cannot support or fail reconstruction, it will
// put the volumes to volumesFailedReconstruction to be cleaned up later when DesiredStateOfWorld
// is populated.
func (rc *reconciler) reconstructVolumes() {
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(kubeletPodDir)
podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
if err != nil {
klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
return
}
volumesNeedUpdate := make(map[v1.UniqueVolumeName]*globalVolumeInfo)
volumeNeedReport := []v1.UniqueVolumeName{}
reconstructedVolumes := make(map[v1.UniqueVolumeName]*globalVolumeInfo)
reconstructedVolumeNames := []v1.UniqueVolumeName{}
for _, volume := range podVolumes {
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
// There is nothing to reconstruct
continue
}
volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
if volumeInDSW {
// Some pod needs the volume, don't clean it up and hope that
// reconcile() calls SetUp and reconstructs the volume in ASW.
klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// No pod needs the volume.
klog.InfoS("Could not construct volume information, cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)
rc.cleanupMounts(volume)
klog.InfoS("Could not construct volume information", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)
// We can't reconstruct the volume. Remember to check DSW after it's fully populated and force unmount the volume when it's orphaned.
rc.volumesFailedReconstruction = append(rc.volumesFailedReconstruction, volume)
continue
}
klog.V(4).InfoS("Adding reconstructed volume to actual state and node status", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
gvl := &globalVolumeInfo{
volumeName: reconstructedVolume.volumeName,
volumeSpec: reconstructedVolume.volumeSpec,
@ -80,113 +80,133 @@ func (rc *reconciler) syncStates(kubeletPodDir string) {
blockVolumeMapper: reconstructedVolume.blockVolumeMapper,
mounter: reconstructedVolume.mounter,
}
if volumeInDSW {
// Some pod needs the volume. And it exists on disk. Some previous
// kubelet must have created the directory, therefore it must have
// reported the volume as in use. Mark the volume as in use also in
// this new kubelet so reconcile() calls SetUp and re-mounts the
// volume if it's necessary.
volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
if cachedInfo, ok := rc.skippedDuringReconstruction[reconstructedVolume.volumeName]; ok {
gvl = cachedInfo
}
gvl.addPodVolume(reconstructedVolume)
rc.skippedDuringReconstruction[reconstructedVolume.volumeName] = gvl
klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// There is no pod that uses the volume.
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
klog.InfoS("Volume is in pending operation, skip cleaning up mounts")
}
klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume)
if cachedInfo, ok := volumesNeedUpdate[reconstructedVolume.volumeName]; ok {
if cachedInfo, ok := reconstructedVolumes[reconstructedVolume.volumeName]; ok {
gvl = cachedInfo
}
gvl.addPodVolume(reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = gvl
reconstructedVolumeNames = append(reconstructedVolumeNames, reconstructedVolume.volumeName)
reconstructedVolumes[reconstructedVolume.volumeName] = gvl
}
if len(volumesNeedUpdate) > 0 {
if err = rc.updateStates(volumesNeedUpdate); err != nil {
klog.ErrorS(err, "Error occurred during reconstruct volume from disk")
}
}
if len(volumeNeedReport) > 0 {
rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport)
if len(reconstructedVolumes) > 0 {
// Add the volumes to ASW
rc.updateStatesNew(reconstructedVolumes)
// The reconstructed volumes are mounted, hence a previous kubelet must have already put it into node.status.volumesInUse.
// Remember to update DSW with this information.
rc.volumesNeedReportedInUse = reconstructedVolumeNames
// Remember to update devicePath from node.status.volumesAttached
rc.volumesNeedUpdateFromNodeStatus = reconstructedVolumeNames
}
klog.V(2).InfoS("Volume reconstruction finished")
}
// updateDevicePath gets the node status to retrieve volume device path information.
func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) {
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
if fetchErr != nil {
klog.ErrorS(fetchErr, "UpdateStates in reconciler: could not get node status with error")
} else {
for _, attachedVolume := range node.Status.VolumesAttached {
if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists {
volume.devicePath = attachedVolume.DevicePath
volumesNeedUpdate[attachedVolume.Name] = volume
klog.V(4).InfoS("Update devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", volume.devicePath)
}
}
}
}
func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) error {
// Get the node status to retrieve volume device path information.
// Skip reporting devicePath in node objects if kubeClient is nil.
// In standalone mode, kubelet is not expected to mount any attachable volume types or secret, configmaps etc.
if rc.kubeClient != nil {
rc.updateDevicePath(volumesNeedUpdate)
}
for _, gvl := range volumesNeedUpdate {
err := rc.actualStateOfWorld.MarkVolumeAsAttached(
func (rc *reconciler) updateStatesNew(reconstructedVolumes map[v1.UniqueVolumeName]*globalVolumeInfo) {
for _, gvl := range reconstructedVolumes {
err := rc.actualStateOfWorld.AddAttachUncertainReconstructedVolume(
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
klog.TODO(), gvl.volumeName, gvl.volumeSpec, rc.nodeName, gvl.devicePath)
gvl.volumeName, gvl.volumeSpec, rc.nodeName, gvl.devicePath)
if err != nil {
klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName)
continue
}
var seLinuxMountContext string
for _, volume := range gvl.podVolumes {
err = rc.markVolumeState(volume, operationexecutor.VolumeMounted)
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: operationexecutor.VolumeMountUncertain,
SELinuxMountContext: volume.seLinuxMountContext,
}
_, err = rc.actualStateOfWorld.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts)
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
klog.V(2).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName)
seLinuxMountContext = volume.seLinuxMountContext
klog.V(2).InfoS("Volume is marked as uncertain and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName, "seLinuxMountContext", volume.seLinuxMountContext)
}
// If the volume has device to mount, we mark its device as mounted.
// If the volume has device to mount, we mark its device as uncertain.
if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(gvl)
if err != nil {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName)
continue
}
err = rc.actualStateOfWorld.MarkDeviceAsMounted(gvl.volumeName, gvl.devicePath, deviceMountPath, "")
err = rc.actualStateOfWorld.MarkDeviceAsUncertain(gvl.volumeName, gvl.devicePath, deviceMountPath, seLinuxMountContext)
if err != nil {
klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", gvl.volumeName)
klog.ErrorS(err, "Could not mark device is uncertain to actual state of world", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath)
continue
}
klog.V(2).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", gvl.volumeName)
klog.V(2).InfoS("Volume is marked device as uncertain and added into the actual state", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath)
}
}
return nil
}
func (rc *reconciler) markVolumeState(volume *reconstructedVolume, volumeState operationexecutor.VolumeMountState) error {
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: volumeState,
// cleanOrphanVolumes tries to clean up all volumes that failed reconstruction.
func (rc *reconciler) cleanOrphanVolumes() {
if len(rc.volumesFailedReconstruction) == 0 {
return
}
err := rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
return err
for _, volume := range rc.volumesFailedReconstruction {
if rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
// Some pod needs the volume, don't clean it up and hope that
// reconcile() calls SetUp and reconstructs the volume in ASW.
klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
klog.InfoS("Cleaning up mounts for volume that could not be reconstructed", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
rc.cleanupMounts(volume)
}
klog.V(2).InfoS("Orphan volume cleanup finished")
// Clean the cache, cleanup is one shot operation.
rc.volumesFailedReconstruction = make([]podVolume, 0)
}
// updateReconstructedFromNodeStatus tries to file devicePaths of reconstructed volumes from
// node.Status.VolumesAttached. This can be done only after connection to the API
// server is established, i.e. it can't be part of reconstructVolumes().
func (rc *reconciler) updateReconstructedFromNodeStatus() {
klog.V(4).InfoS("Updating reconstructed devicePaths")
if rc.kubeClient == nil {
// Skip reconstructing devicePath from node objects if kubelet is in standalone mode.
// Such kubelet is not expected to mount any attachable volume or Secrets / ConfigMap.
klog.V(2).InfoS("Skipped reconstruction of DevicePaths from node.status in standalone mode")
rc.volumesNeedUpdateFromNodeStatus = nil
return
}
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
if fetchErr != nil {
// This may repeat few times per second until kubelet is able to read its own status for the first time.
klog.V(4).ErrorS(fetchErr, "Failed to get Node status to reconstruct device paths")
return
}
for _, volumeID := range rc.volumesNeedUpdateFromNodeStatus {
attachable := false
for _, attachedVolume := range node.Status.VolumesAttached {
if volumeID != attachedVolume.Name {
continue
}
rc.actualStateOfWorld.UpdateReconstructedDevicePath(volumeID, attachedVolume.DevicePath)
attachable = true
klog.V(4).InfoS("Updated devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", attachedVolume.DevicePath)
}
rc.actualStateOfWorld.UpdateReconstructedVolumeAttachability(volumeID, attachable)
}
klog.V(2).InfoS("DevicePaths of reconstructed volumes updated")
rc.volumesNeedUpdateFromNodeStatus = nil
}

View File

@ -1,212 +0,0 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconciler
import (
"context"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
// TODO: move to reconstruct.go and remove old code there.
// readyToUnmount returns true when reconciler can start unmounting volumes.
func (rc *reconciler) readyToUnmount() bool {
// During kubelet startup, all volumes present on disk are added as uncertain to ASW.
// Allow unmount only when DSW is fully populated to prevent unmounting volumes that
// did not reach DSW yet.
if !rc.populatorHasAddedPods() {
return false
}
// Allow unmount only when ASW device paths were corrected from node.status to prevent
// calling unmount with a wrong devicePath.
if len(rc.volumesNeedUpdateFromNodeStatus) != 0 {
return false
}
return true
}
// reconstructVolumes tries to reconstruct the actual state of world by scanning all pods' volume
// directories from the disk. For the volumes that cannot support or fail reconstruction, it will
// put the volumes to volumesFailedReconstruction to be cleaned up later when DesiredStateOfWorld
// is populated.
func (rc *reconciler) reconstructVolumes() {
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
if err != nil {
klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
return
}
reconstructedVolumes := make(map[v1.UniqueVolumeName]*globalVolumeInfo)
reconstructedVolumeNames := []v1.UniqueVolumeName{}
for _, volume := range podVolumes {
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
// There is nothing to reconstruct
continue
}
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
klog.InfoS("Could not construct volume information", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)
// We can't reconstruct the volume. Remember to check DSW after it's fully populated and force unmount the volume when it's orphaned.
rc.volumesFailedReconstruction = append(rc.volumesFailedReconstruction, volume)
continue
}
klog.V(4).InfoS("Adding reconstructed volume to actual state and node status", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
gvl := &globalVolumeInfo{
volumeName: reconstructedVolume.volumeName,
volumeSpec: reconstructedVolume.volumeSpec,
devicePath: reconstructedVolume.devicePath,
deviceMounter: reconstructedVolume.deviceMounter,
blockVolumeMapper: reconstructedVolume.blockVolumeMapper,
mounter: reconstructedVolume.mounter,
}
if cachedInfo, ok := reconstructedVolumes[reconstructedVolume.volumeName]; ok {
gvl = cachedInfo
}
gvl.addPodVolume(reconstructedVolume)
reconstructedVolumeNames = append(reconstructedVolumeNames, reconstructedVolume.volumeName)
reconstructedVolumes[reconstructedVolume.volumeName] = gvl
}
if len(reconstructedVolumes) > 0 {
// Add the volumes to ASW
rc.updateStatesNew(reconstructedVolumes)
// The reconstructed volumes are mounted, hence a previous kubelet must have already put it into node.status.volumesInUse.
// Remember to update DSW with this information.
rc.volumesNeedReportedInUse = reconstructedVolumeNames
// Remember to update devicePath from node.status.volumesAttached
rc.volumesNeedUpdateFromNodeStatus = reconstructedVolumeNames
}
klog.V(2).InfoS("Volume reconstruction finished")
}
func (rc *reconciler) updateStatesNew(reconstructedVolumes map[v1.UniqueVolumeName]*globalVolumeInfo) {
for _, gvl := range reconstructedVolumes {
err := rc.actualStateOfWorld.AddAttachUncertainReconstructedVolume(
//TODO: the devicePath might not be correct for some volume plugins: see issue #54108
gvl.volumeName, gvl.volumeSpec, rc.nodeName, gvl.devicePath)
if err != nil {
klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName)
continue
}
var seLinuxMountContext string
for _, volume := range gvl.podVolumes {
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: volume.podName,
PodUID: types.UID(volume.podName),
VolumeName: volume.volumeName,
Mounter: volume.mounter,
BlockVolumeMapper: volume.blockVolumeMapper,
OuterVolumeSpecName: volume.outerVolumeSpecName,
VolumeGidVolume: volume.volumeGidValue,
VolumeSpec: volume.volumeSpec,
VolumeMountState: operationexecutor.VolumeMountUncertain,
SELinuxMountContext: volume.seLinuxMountContext,
}
_, err = rc.actualStateOfWorld.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts)
if err != nil {
klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod))
continue
}
seLinuxMountContext = volume.seLinuxMountContext
klog.V(2).InfoS("Volume is marked as uncertain and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName, "seLinuxMountContext", volume.seLinuxMountContext)
}
// If the volume has device to mount, we mark its device as uncertain.
if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(gvl)
if err != nil {
klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName)
continue
}
err = rc.actualStateOfWorld.MarkDeviceAsUncertain(gvl.volumeName, gvl.devicePath, deviceMountPath, seLinuxMountContext)
if err != nil {
klog.ErrorS(err, "Could not mark device is uncertain to actual state of world", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath)
continue
}
klog.V(2).InfoS("Volume is marked device as uncertain and added into the actual state", "volumeName", gvl.volumeName, "deviceMountPath", deviceMountPath)
}
}
}
// cleanOrphanVolumes tries to clean up all volumes that failed reconstruction.
func (rc *reconciler) cleanOrphanVolumes() {
if len(rc.volumesFailedReconstruction) == 0 {
return
}
for _, volume := range rc.volumesFailedReconstruction {
if rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
// Some pod needs the volume, don't clean it up and hope that
// reconcile() calls SetUp and reconstructs the volume in ASW.
klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
klog.InfoS("Cleaning up mounts for volume that could not be reconstructed", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
rc.cleanupMounts(volume)
}
klog.V(2).InfoS("Orphan volume cleanup finished")
// Clean the cache, cleanup is one shot operation.
rc.volumesFailedReconstruction = make([]podVolume, 0)
}
// updateReconstructedFromNodeStatus tries to file devicePaths of reconstructed volumes from
// node.Status.VolumesAttached. This can be done only after connection to the API
// server is established, i.e. it can't be part of reconstructVolumes().
func (rc *reconciler) updateReconstructedFromNodeStatus() {
klog.V(4).InfoS("Updating reconstructed devicePaths")
if rc.kubeClient == nil {
// Skip reconstructing devicePath from node objects if kubelet is in standalone mode.
// Such kubelet is not expected to mount any attachable volume or Secrets / ConfigMap.
klog.V(2).InfoS("Skipped reconstruction of DevicePaths from node.status in standalone mode")
rc.volumesNeedUpdateFromNodeStatus = nil
return
}
node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{})
if fetchErr != nil {
// This may repeat few times per second until kubelet is able to read its own status for the first time.
klog.V(4).ErrorS(fetchErr, "Failed to get Node status to reconstruct device paths")
return
}
for _, volumeID := range rc.volumesNeedUpdateFromNodeStatus {
attachable := false
for _, attachedVolume := range node.Status.VolumesAttached {
if volumeID != attachedVolume.Name {
continue
}
rc.actualStateOfWorld.UpdateReconstructedDevicePath(volumeID, attachedVolume.DevicePath)
attachable = true
klog.V(4).InfoS("Updated devicePath from node status for volume", "volumeName", attachedVolume.Name, "path", attachedVolume.DevicePath)
}
rc.actualStateOfWorld.UpdateReconstructedVolumeAttachability(volumeID, attachable)
}
klog.V(2).InfoS("DevicePaths of reconstructed volumes updated")
rc.volumesNeedUpdateFromNodeStatus = nil
}

View File

@ -26,18 +26,13 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
)
func TestReconstructVolumes(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NewVolumeManagerReconstruction, true)()
tests := []struct {
name string
volumePaths []string
@ -148,8 +143,6 @@ func TestReconstructVolumes(t *testing.T) {
}
func TestCleanOrphanVolumes(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NewVolumeManagerReconstruction, true)()
type podInfo struct {
podName string
podUID string
@ -263,7 +256,6 @@ func TestReconstructVolumesMount(t *testing.T) {
// Since the volume is reconstructed, it must be marked as uncertain
// even after a final SetUp error, see https://github.com/kubernetes/kubernetes/issues/96635
// and https://github.com/kubernetes/kubernetes/pull/110670.
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NewVolumeManagerReconstruction, true)()
tests := []struct {
name string