From c8b85fb47072ffea083560a961bd16e6935e8f2a Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Thu, 2 Jun 2022 12:26:58 -0400 Subject: [PATCH] Keep track of each pod that uses a volume during reconstruction Add tests for volume cleaning up --- .../desired_state_of_world_populator.go | 1 - .../volumemanager/reconciler/reconciler.go | 109 +++++++++------ .../reconciler/reconciler_test.go | 128 ++++++++++++++++++ pkg/volume/testing/testing.go | 13 ++ 4 files changed, 211 insertions(+), 40 deletions(-) diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index f2cbe842eff..67e3d7f6212 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -251,7 +251,6 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { continue } klog.V(4).InfoS("Removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName) - dswp.desiredStateOfWorld.DeletePodFromVolume( volumeToMount.PodName, volumeToMount.VolumeName) dswp.deleteProcessedPod(volumeToMount.PodName) diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 3c3b9b4d282..164b7cf5dff 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -334,7 +334,7 @@ func (rc *reconciler) unmountDetachDevices() { // it will try to clean up the mount paths with operation executor. func (rc *reconciler) sync() { defer rc.updateLastSyncTime() - rc.syncStates() + rc.syncStates(rc.kubeletPodsDir) } func (rc *reconciler) updateLastSyncTime() { @@ -366,19 +366,36 @@ type reconstructedVolume struct { blockVolumeMapper volumepkg.BlockVolumeMapper } +type globalVolumeInfo struct { + volumeName v1.UniqueVolumeName + volumeSpec *volumepkg.Spec + devicePath string + mounter volumepkg.Mounter + deviceMounter volumepkg.DeviceMounter + blockVolumeMapper volumepkg.BlockVolumeMapper + podVolumes map[volumetypes.UniquePodName]*reconstructedVolume +} + +func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) { + if gvi.podVolumes == nil { + gvi.podVolumes = map[volumetypes.UniquePodName]*reconstructedVolume{} + } + gvi.podVolumes[rcv.podName] = rcv +} + // syncStates scans the volume directories under the given pod directory. // If the volume is not in desired state of world, this function will reconstruct // the volume related information and put it in both the actual and desired state of worlds. // For some volume plugins that cannot support reconstruction, it will clean up the existing // mount points since the volume is no long needed (removed from desired state) -func (rc *reconciler) syncStates() { +func (rc *reconciler) syncStates(kubeletPodDir string) { // Get volumes information by reading the pod's directory - podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir) + podVolumes, err := getVolumesFromPodDir(kubeletPodDir) if err != nil { klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction") return } - volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume) + volumesNeedUpdate := make(map[v1.UniqueVolumeName]*globalVolumeInfo) volumeNeedReport := []v1.UniqueVolumeName{} for _, volume := range podVolumes { if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) { @@ -416,7 +433,19 @@ func (rc *reconciler) syncStates() { klog.InfoS("Volume is in pending operation, skip cleaning up mounts") } klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume) - volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume + gvl := &globalVolumeInfo{ + volumeName: reconstructedVolume.volumeName, + volumeSpec: reconstructedVolume.volumeSpec, + devicePath: reconstructedVolume.devicePath, + deviceMounter: reconstructedVolume.deviceMounter, + blockVolumeMapper: reconstructedVolume.blockVolumeMapper, + mounter: reconstructedVolume.mounter, + } + if cachedInfo, ok := volumesNeedUpdate[reconstructedVolume.volumeName]; ok { + gvl = cachedInfo + } + gvl.addPodVolume(reconstructedVolume) + volumesNeedUpdate[reconstructedVolume.volumeName] = gvl } if len(volumesNeedUpdate) > 0 { @@ -590,7 +619,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, } // updateDevicePath gets the node status to retrieve volume device path information. -func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) { +func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) { node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{}) if fetchErr != nil { klog.ErrorS(fetchErr, "UpdateStates in reconciler: could not get node status with error") @@ -608,19 +637,19 @@ func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName // getDeviceMountPath returns device mount path for block volume which // implements BlockVolumeMapper or filesystem volume which implements // DeviceMounter -func getDeviceMountPath(volume *reconstructedVolume) (string, error) { - if volume.blockVolumeMapper != nil { - // for block volume, we return its global map path - return volume.blockVolumeMapper.GetGlobalMapPath(volume.volumeSpec) - } else if volume.deviceMounter != nil { - // for filesystem volume, we return its device mount path if the plugin implements DeviceMounter - return volume.deviceMounter.GetDeviceMountPath(volume.volumeSpec) +func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) { + if gvi.blockVolumeMapper != nil { + // for block gvi, we return its global map path + return gvi.blockVolumeMapper.GetGlobalMapPath(gvi.volumeSpec) + } else if gvi.deviceMounter != nil { + // for filesystem gvi, we return its device mount path if the plugin implements DeviceMounter + return gvi.deviceMounter.GetDeviceMountPath(gvi.volumeSpec) } else { return "", fmt.Errorf("blockVolumeMapper or deviceMounter required") } } -func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) error { +func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) error { // Get the node status to retrieve volume device path information. // Skip reporting devicePath in node objects if kubeClient is nil. // In standalone mode, kubelet is not expected to mount any attachable volume types or secret, configmaps etc. @@ -628,44 +657,46 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re rc.updateDevicePath(volumesNeedUpdate) } - for _, volume := range volumesNeedUpdate { + for _, gvl := range volumesNeedUpdate { err := rc.actualStateOfWorld.MarkVolumeAsAttached( //TODO: the devicePath might not be correct for some volume plugins: see issue #54108 - volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath) + gvl.volumeName, gvl.volumeSpec, "" /* nodeName */, gvl.devicePath) if err != nil { - klog.ErrorS(err, "Could not add volume information to actual state of world", "pod", klog.KObj(volume.pod)) + klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName) continue } - markVolumeOpts := operationexecutor.MarkVolumeOpts{ - PodName: volume.podName, - PodUID: types.UID(volume.podName), - VolumeName: volume.volumeName, - Mounter: volume.mounter, - BlockVolumeMapper: volume.blockVolumeMapper, - OuterVolumeSpecName: volume.outerVolumeSpecName, - VolumeGidVolume: volume.volumeGidValue, - VolumeSpec: volume.volumeSpec, - VolumeMountState: operationexecutor.VolumeMounted, + for _, volume := range gvl.podVolumes { + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: volume.podName, + PodUID: types.UID(volume.podName), + VolumeName: volume.volumeName, + Mounter: volume.mounter, + BlockVolumeMapper: volume.blockVolumeMapper, + OuterVolumeSpecName: volume.outerVolumeSpecName, + VolumeGidVolume: volume.volumeGidValue, + VolumeSpec: volume.volumeSpec, + VolumeMountState: operationexecutor.VolumeMounted, + } + err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) + if err != nil { + klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) + continue + } + klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) } - err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) - if err != nil { - klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) - continue - } - klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) // If the volume has device to mount, we mark its device as mounted. - if volume.deviceMounter != nil || volume.blockVolumeMapper != nil { - deviceMountPath, err := getDeviceMountPath(volume) + if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil { + deviceMountPath, err := getDeviceMountPath(gvl) if err != nil { - klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", volume.volumeName, "pod", klog.KObj(volume.pod)) + klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName) continue } - err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName, volume.devicePath, deviceMountPath) + err = rc.actualStateOfWorld.MarkDeviceAsMounted(gvl.volumeName, gvl.devicePath, deviceMountPath) if err != nil { - klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "pod", klog.KObj(volume.pod)) + klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", gvl.volumeName) continue } - klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) + klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", gvl.volumeName) } } return nil diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index ddf3a4beee9..fd9541d5e05 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -21,6 +21,9 @@ import ( "fmt" csitrans "k8s.io/csi-translation-lib" "k8s.io/kubernetes/pkg/volume/csimigration" + "os" + "path" + "path/filepath" "testing" "time" @@ -2181,3 +2184,128 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) { <-finished waitForMount(t, fakePlugin, generatedVolumeName, asw) } + +func getFakeNode() *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(nodeName), + }, + Status: v1.NodeStatus{ + VolumesAttached: []v1.AttachedVolume{ + { + Name: "fake-plugin/fake-device1", + DevicePath: "/fake/path", + }, + }, + }, + } +} + +func getReconciler(kubeletDir string, t *testing.T, volumePaths []string) (Reconciler, *volumetesting.FakeVolumePlugin) { + node := getFakeNode() + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNodeAndRoot(t, node, kubeletDir) + tmpKubeletPodDir := filepath.Join(kubeletDir, "pods") + + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createTestClient() + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + fakeHandler)) + mountPoints := []mount.MountPoint{} + for _, volumePath := range volumePaths { + mountPoints = append(mountPoints, mount.MountPoint{Path: volumePath}) + } + rc := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + mount.NewFakeMounter(mountPoints), + hostutil.NewFakeHostUtil(nil), + volumePluginMgr, + tmpKubeletPodDir) + return rc, fakePlugin +} + +func TestSyncStates(t *testing.T) { + tests := []struct { + name string + volumePaths []string + createMountPoint bool + verifyFunc func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error + }{ + { + name: "when two pods are using same volume and both are deleted", + volumePaths: []string{ + path.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"), + path.Join("pod2", "volumes", "fake-plugin", "pvc-abcdef"), + }, + createMountPoint: true, + verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { + mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes() + if len(mountedPods) != 2 { + return fmt.Errorf("expected 2 pods to in asw got %d", len(mountedPods)) + } + return nil + }, + }, + { + name: "when reconstruction fails for a volume, volumes should be cleaned up", + volumePaths: []string{ + path.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"), + }, + createMountPoint: false, + verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { + return retryWithExponentialBackOff(reconcilerSyncWaitDuration, func() (bool, error) { + err := volumetesting.VerifyTearDownCallCount(1, fakePlugin) + if err != nil { + return false, nil + } + return true, nil + }) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tmpKubeletDir, err := os.MkdirTemp("", "") + if err != nil { + t.Fatalf("can't make a temp directory for kubeletPods: %v", err) + } + defer os.RemoveAll(tmpKubeletDir) + + // create kubelet pod directory + tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods") + os.MkdirAll(tmpKubeletPodDir, 0755) + + mountPaths := []string{} + + // create pod and volume directories so as reconciler can find them. + for _, volumePath := range tc.volumePaths { + vp := filepath.Join(tmpKubeletPodDir, volumePath) + if tc.createMountPoint { + mountPaths = append(mountPaths, vp) + } + os.MkdirAll(vp, 0755) + } + + rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths) + rcInstance, _ := rc.(*reconciler) + rcInstance.syncStates(tmpKubeletPodDir) + if err := tc.verifyFunc(rcInstance, fakePlugin); err != nil { + t.Errorf("test %s failed: %v", tc.name, err) + } + }) + } + +} diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index ef2f12056df..8484aa31c76 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -1660,6 +1660,19 @@ func GetTestKubeletVolumePluginMgrWithNode(t *testing.T, node *v1.Node) (*volume return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin) } +func GetTestKubeletVolumePluginMgrWithNodeAndRoot(t *testing.T, node *v1.Node, rootDir string) (*VolumePluginMgr, *FakeVolumePlugin) { + plugins := ProbeVolumePlugins(VolumeConfig{}) + v := NewFakeKubeletVolumeHost( + t, + rootDir, /* rootDir */ + nil, /* kubeClient */ + plugins, /* plugins */ + ) + v.WithNode(node) + + return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin) +} + // CreateTestPVC returns a provisionable PVC for tests func CreateTestPVC(capacity string, accessModes []v1.PersistentVolumeAccessMode) *v1.PersistentVolumeClaim { claim := v1.PersistentVolumeClaim{