From c8b85fb47072ffea083560a961bd16e6935e8f2a Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Thu, 2 Jun 2022 12:26:58 -0400 Subject: [PATCH 1/6] Keep track of each pod that uses a volume during reconstruction Add tests for volume cleaning up --- .../desired_state_of_world_populator.go | 1 - .../volumemanager/reconciler/reconciler.go | 109 +++++++++------ .../reconciler/reconciler_test.go | 128 ++++++++++++++++++ pkg/volume/testing/testing.go | 13 ++ 4 files changed, 211 insertions(+), 40 deletions(-) diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index f2cbe842eff..67e3d7f6212 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -251,7 +251,6 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { continue } klog.V(4).InfoS("Removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName) - dswp.desiredStateOfWorld.DeletePodFromVolume( volumeToMount.PodName, volumeToMount.VolumeName) dswp.deleteProcessedPod(volumeToMount.PodName) diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 3c3b9b4d282..164b7cf5dff 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -334,7 +334,7 @@ func (rc *reconciler) unmountDetachDevices() { // it will try to clean up the mount paths with operation executor. func (rc *reconciler) sync() { defer rc.updateLastSyncTime() - rc.syncStates() + rc.syncStates(rc.kubeletPodsDir) } func (rc *reconciler) updateLastSyncTime() { @@ -366,19 +366,36 @@ type reconstructedVolume struct { blockVolumeMapper volumepkg.BlockVolumeMapper } +type globalVolumeInfo struct { + volumeName v1.UniqueVolumeName + volumeSpec *volumepkg.Spec + devicePath string + mounter volumepkg.Mounter + deviceMounter volumepkg.DeviceMounter + blockVolumeMapper volumepkg.BlockVolumeMapper + podVolumes map[volumetypes.UniquePodName]*reconstructedVolume +} + +func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) { + if gvi.podVolumes == nil { + gvi.podVolumes = map[volumetypes.UniquePodName]*reconstructedVolume{} + } + gvi.podVolumes[rcv.podName] = rcv +} + // syncStates scans the volume directories under the given pod directory. // If the volume is not in desired state of world, this function will reconstruct // the volume related information and put it in both the actual and desired state of worlds. // For some volume plugins that cannot support reconstruction, it will clean up the existing // mount points since the volume is no long needed (removed from desired state) -func (rc *reconciler) syncStates() { +func (rc *reconciler) syncStates(kubeletPodDir string) { // Get volumes information by reading the pod's directory - podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir) + podVolumes, err := getVolumesFromPodDir(kubeletPodDir) if err != nil { klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction") return } - volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume) + volumesNeedUpdate := make(map[v1.UniqueVolumeName]*globalVolumeInfo) volumeNeedReport := []v1.UniqueVolumeName{} for _, volume := range podVolumes { if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) { @@ -416,7 +433,19 @@ func (rc *reconciler) syncStates() { klog.InfoS("Volume is in pending operation, skip cleaning up mounts") } klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume) - volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume + gvl := &globalVolumeInfo{ + volumeName: reconstructedVolume.volumeName, + volumeSpec: reconstructedVolume.volumeSpec, + devicePath: reconstructedVolume.devicePath, + deviceMounter: reconstructedVolume.deviceMounter, + blockVolumeMapper: reconstructedVolume.blockVolumeMapper, + mounter: reconstructedVolume.mounter, + } + if cachedInfo, ok := volumesNeedUpdate[reconstructedVolume.volumeName]; ok { + gvl = cachedInfo + } + gvl.addPodVolume(reconstructedVolume) + volumesNeedUpdate[reconstructedVolume.volumeName] = gvl } if len(volumesNeedUpdate) > 0 { @@ -590,7 +619,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, } // updateDevicePath gets the node status to retrieve volume device path information. -func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) { +func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) { node, fetchErr := rc.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(rc.nodeName), metav1.GetOptions{}) if fetchErr != nil { klog.ErrorS(fetchErr, "UpdateStates in reconciler: could not get node status with error") @@ -608,19 +637,19 @@ func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName // getDeviceMountPath returns device mount path for block volume which // implements BlockVolumeMapper or filesystem volume which implements // DeviceMounter -func getDeviceMountPath(volume *reconstructedVolume) (string, error) { - if volume.blockVolumeMapper != nil { - // for block volume, we return its global map path - return volume.blockVolumeMapper.GetGlobalMapPath(volume.volumeSpec) - } else if volume.deviceMounter != nil { - // for filesystem volume, we return its device mount path if the plugin implements DeviceMounter - return volume.deviceMounter.GetDeviceMountPath(volume.volumeSpec) +func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) { + if gvi.blockVolumeMapper != nil { + // for block gvi, we return its global map path + return gvi.blockVolumeMapper.GetGlobalMapPath(gvi.volumeSpec) + } else if gvi.deviceMounter != nil { + // for filesystem gvi, we return its device mount path if the plugin implements DeviceMounter + return gvi.deviceMounter.GetDeviceMountPath(gvi.volumeSpec) } else { return "", fmt.Errorf("blockVolumeMapper or deviceMounter required") } } -func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) error { +func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*globalVolumeInfo) error { // Get the node status to retrieve volume device path information. // Skip reporting devicePath in node objects if kubeClient is nil. // In standalone mode, kubelet is not expected to mount any attachable volume types or secret, configmaps etc. @@ -628,44 +657,46 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re rc.updateDevicePath(volumesNeedUpdate) } - for _, volume := range volumesNeedUpdate { + for _, gvl := range volumesNeedUpdate { err := rc.actualStateOfWorld.MarkVolumeAsAttached( //TODO: the devicePath might not be correct for some volume plugins: see issue #54108 - volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath) + gvl.volumeName, gvl.volumeSpec, "" /* nodeName */, gvl.devicePath) if err != nil { - klog.ErrorS(err, "Could not add volume information to actual state of world", "pod", klog.KObj(volume.pod)) + klog.ErrorS(err, "Could not add volume information to actual state of world", "volumeName", gvl.volumeName) continue } - markVolumeOpts := operationexecutor.MarkVolumeOpts{ - PodName: volume.podName, - PodUID: types.UID(volume.podName), - VolumeName: volume.volumeName, - Mounter: volume.mounter, - BlockVolumeMapper: volume.blockVolumeMapper, - OuterVolumeSpecName: volume.outerVolumeSpecName, - VolumeGidVolume: volume.volumeGidValue, - VolumeSpec: volume.volumeSpec, - VolumeMountState: operationexecutor.VolumeMounted, + for _, volume := range gvl.podVolumes { + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: volume.podName, + PodUID: types.UID(volume.podName), + VolumeName: volume.volumeName, + Mounter: volume.mounter, + BlockVolumeMapper: volume.blockVolumeMapper, + OuterVolumeSpecName: volume.outerVolumeSpecName, + VolumeGidVolume: volume.volumeGidValue, + VolumeSpec: volume.volumeSpec, + VolumeMountState: operationexecutor.VolumeMounted, + } + err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) + if err != nil { + klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) + continue + } + klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) } - err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) - if err != nil { - klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) - continue - } - klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) // If the volume has device to mount, we mark its device as mounted. - if volume.deviceMounter != nil || volume.blockVolumeMapper != nil { - deviceMountPath, err := getDeviceMountPath(volume) + if gvl.deviceMounter != nil || gvl.blockVolumeMapper != nil { + deviceMountPath, err := getDeviceMountPath(gvl) if err != nil { - klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", volume.volumeName, "pod", klog.KObj(volume.pod)) + klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", gvl.volumeName) continue } - err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName, volume.devicePath, deviceMountPath) + err = rc.actualStateOfWorld.MarkDeviceAsMounted(gvl.volumeName, gvl.devicePath, deviceMountPath) if err != nil { - klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "pod", klog.KObj(volume.pod)) + klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", gvl.volumeName) continue } - klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) + klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", gvl.volumeName) } } return nil diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index ddf3a4beee9..fd9541d5e05 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -21,6 +21,9 @@ import ( "fmt" csitrans "k8s.io/csi-translation-lib" "k8s.io/kubernetes/pkg/volume/csimigration" + "os" + "path" + "path/filepath" "testing" "time" @@ -2181,3 +2184,128 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) { <-finished waitForMount(t, fakePlugin, generatedVolumeName, asw) } + +func getFakeNode() *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(nodeName), + }, + Status: v1.NodeStatus{ + VolumesAttached: []v1.AttachedVolume{ + { + Name: "fake-plugin/fake-device1", + DevicePath: "/fake/path", + }, + }, + }, + } +} + +func getReconciler(kubeletDir string, t *testing.T, volumePaths []string) (Reconciler, *volumetesting.FakeVolumePlugin) { + node := getFakeNode() + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNodeAndRoot(t, node, kubeletDir) + tmpKubeletPodDir := filepath.Join(kubeletDir, "pods") + + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createTestClient() + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + fakeHandler)) + mountPoints := []mount.MountPoint{} + for _, volumePath := range volumePaths { + mountPoints = append(mountPoints, mount.MountPoint{Path: volumePath}) + } + rc := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + mount.NewFakeMounter(mountPoints), + hostutil.NewFakeHostUtil(nil), + volumePluginMgr, + tmpKubeletPodDir) + return rc, fakePlugin +} + +func TestSyncStates(t *testing.T) { + tests := []struct { + name string + volumePaths []string + createMountPoint bool + verifyFunc func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error + }{ + { + name: "when two pods are using same volume and both are deleted", + volumePaths: []string{ + path.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"), + path.Join("pod2", "volumes", "fake-plugin", "pvc-abcdef"), + }, + createMountPoint: true, + verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { + mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes() + if len(mountedPods) != 2 { + return fmt.Errorf("expected 2 pods to in asw got %d", len(mountedPods)) + } + return nil + }, + }, + { + name: "when reconstruction fails for a volume, volumes should be cleaned up", + volumePaths: []string{ + path.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"), + }, + createMountPoint: false, + verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { + return retryWithExponentialBackOff(reconcilerSyncWaitDuration, func() (bool, error) { + err := volumetesting.VerifyTearDownCallCount(1, fakePlugin) + if err != nil { + return false, nil + } + return true, nil + }) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tmpKubeletDir, err := os.MkdirTemp("", "") + if err != nil { + t.Fatalf("can't make a temp directory for kubeletPods: %v", err) + } + defer os.RemoveAll(tmpKubeletDir) + + // create kubelet pod directory + tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods") + os.MkdirAll(tmpKubeletPodDir, 0755) + + mountPaths := []string{} + + // create pod and volume directories so as reconciler can find them. + for _, volumePath := range tc.volumePaths { + vp := filepath.Join(tmpKubeletPodDir, volumePath) + if tc.createMountPoint { + mountPaths = append(mountPaths, vp) + } + os.MkdirAll(vp, 0755) + } + + rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths) + rcInstance, _ := rc.(*reconciler) + rcInstance.syncStates(tmpKubeletPodDir) + if err := tc.verifyFunc(rcInstance, fakePlugin); err != nil { + t.Errorf("test %s failed: %v", tc.name, err) + } + }) + } + +} diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index ef2f12056df..8484aa31c76 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -1660,6 +1660,19 @@ func GetTestKubeletVolumePluginMgrWithNode(t *testing.T, node *v1.Node) (*volume return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin) } +func GetTestKubeletVolumePluginMgrWithNodeAndRoot(t *testing.T, node *v1.Node, rootDir string) (*VolumePluginMgr, *FakeVolumePlugin) { + plugins := ProbeVolumePlugins(VolumeConfig{}) + v := NewFakeKubeletVolumeHost( + t, + rootDir, /* rootDir */ + nil, /* kubeClient */ + plugins, /* plugins */ + ) + v.WithNode(node) + + return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin) +} + // CreateTestPVC returns a provisionable PVC for tests func CreateTestPVC(capacity string, accessModes []v1.PersistentVolumeAccessMode) *v1.PersistentVolumeClaim { claim := v1.PersistentVolumeClaim{ From eb071c27558fe6f87c7d3b08b4cf646446ff995e Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Tue, 14 Jun 2022 15:37:21 -0400 Subject: [PATCH 2/6] Fix code to process volumes which were skipped during reconstruction --- .../cache/actual_state_of_world.go | 52 +++++++- .../volumemanager/reconciler/reconciler.go | 123 ++++++++++++++---- .../operationexecutor/operation_executor.go | 4 + .../operationexecutor/operation_generator.go | 4 +- 4 files changed, 149 insertions(+), 34 deletions(-) diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 7deb6df910d..c279b9f862a 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -196,9 +196,10 @@ func NewActualStateOfWorld( nodeName types.NodeName, volumePluginMgr *volume.VolumePluginMgr) ActualStateOfWorld { return &actualStateOfWorld{ - nodeName: nodeName, - attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume), - volumePluginMgr: volumePluginMgr, + nodeName: nodeName, + attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume), + foundDuringReconstruction: make(map[v1.UniqueVolumeName]map[volumetypes.UniquePodName]types.UID), + volumePluginMgr: volumePluginMgr, } } @@ -226,7 +227,8 @@ type actualStateOfWorld struct { // state by default. // The key in this map is the name of the volume and the value is an object // containing more information about the attached volume. - attachedVolumes map[v1.UniqueVolumeName]attachedVolume + attachedVolumes map[v1.UniqueVolumeName]attachedVolume + foundDuringReconstruction map[v1.UniqueVolumeName]map[volumetypes.UniquePodName]types.UID // volumePluginMgr is the volume plugin manager used to create volume // plugin objects. @@ -346,6 +348,40 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached( asw.DeleteVolume(volumeName) } +func (asw *actualStateOfWorld) AddVolumeViaReconstruction(opts operationexecutor.MarkVolumeOpts) error { + err := asw.MarkVolumeAsMounted(opts) + if err != nil { + return err + } + asw.Lock() + defer asw.Unlock() + + podMap, ok := asw.foundDuringReconstruction[opts.VolumeName] + if !ok { + podMap = map[volumetypes.UniquePodName]types.UID{} + } + podMap[opts.PodName] = opts.PodUID + asw.foundDuringReconstruction[opts.VolumeName] = podMap + return nil +} + +func (asw *actualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { + asw.RLock() + defer asw.RUnlock() + + volumeState := asw.GetVolumeMountState(volumeName, podName) + // only uncertain volumes are reconstructed + if volumeState != operationexecutor.VolumeMountUncertain { + return false + } + podMap, ok := asw.foundDuringReconstruction[volumeName] + if !ok { + return false + } + _, foundPod := podMap[podName] + return foundPod +} + func (asw *actualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts operationexecutor.MarkVolumeOpts) error { return asw.AddPodToVolume(markVolumeOpts) } @@ -641,6 +677,12 @@ func (asw *actualStateOfWorld) DeletePodFromVolume( delete(asw.attachedVolumes[volumeName].mountedPods, podName) } + // if there were reconstructed volumes, we should remove them + _, podExists = asw.foundDuringReconstruction[volumeName] + if podExists { + delete(asw.foundDuringReconstruction[volumeName], podName) + } + return nil } @@ -661,6 +703,7 @@ func (asw *actualStateOfWorld) DeleteVolume(volumeName v1.UniqueVolumeName) erro } delete(asw.attachedVolumes, volumeName) + delete(asw.foundDuringReconstruction, volumeName) return nil } @@ -739,7 +782,6 @@ func (asw *actualStateOfWorld) PodRemovedFromVolume( return false } } - return true } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 164b7cf5dff..a342d58ba9b 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -119,6 +119,7 @@ func NewReconciler( operationExecutor: operationExecutor, mounter: mounter, hostutil: hostutil, + skippedDuringReconstruction: map[v1.UniqueVolumeName]*globalVolumeInfo{}, volumePluginMgr: volumePluginMgr, kubeletPodsDir: kubeletPodsDir, timeOfLastSync: time.Time{}, @@ -138,6 +139,7 @@ type reconciler struct { mounter mount.Interface hostutil hostutil.HostUtils volumePluginMgr *volumepkg.VolumePluginMgr + skippedDuringReconstruction map[v1.UniqueVolumeName]*globalVolumeInfo kubeletPodsDir string timeOfLastSync time.Time } @@ -175,6 +177,10 @@ func (rc *reconciler) reconcile() { // Ensure devices that should be detached/unmounted are detached/unmounted. rc.unmountDetachDevices() + + if len(rc.skippedDuringReconstruction) > 0 { + rc.processReconstructedVolumes() + } } func (rc *reconciler) unmountVolumes() { @@ -249,6 +255,63 @@ func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, po } } +// processReconstructedVolumes checks volumes which were skipped during the reconstruction +// process because it was assumed that since these volumes were present in DSOW they would get +// mounted correctly and make it into ASOW. +// But if mount operation fails for some reason then we still need to mark the volume as uncertain +// and wait for the next reconciliation loop to deal with it. +func (rc *reconciler) processReconstructedVolumes() { + if rc.kubeClient != nil { + rc.updateDevicePath(rc.skippedDuringReconstruction) + } + for volumeName, glblVolumeInfo := range rc.skippedDuringReconstruction { + // check if volume is marked as attached to the node + // for now lets only process volumes which are at least known as attached to the node + // this should help with most volume types (including secret, configmap etc) + if !rc.actualStateOfWorld.VolumeExists(volumeName) { + klog.V(4).InfoS("Volume is not marked as attached to the node. Skipping processing of the volume", "volumeName", volumeName) + delete(rc.skippedDuringReconstruction, volumeName) + continue + } + uncertainVolumeCount := 0 + + for podName, volume := range glblVolumeInfo.podVolumes { + volumeNotMounted := rc.actualStateOfWorld.PodRemovedFromVolume(podName, volume.volumeName) + // if volume is not mounted then lets mark volume mounted in uncertain state in ASOW + if volumeNotMounted { + err := rc.markVolumeState(volume, operationexecutor.VolumeMountUncertain) + uncertainVolumeCount += 1 + if err != nil { + klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) + continue + } + klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) + } + } + + if uncertainVolumeCount > 0 { + // If the volume has device to mount, we mark its device as mounted. + if glblVolumeInfo.deviceMounter != nil || glblVolumeInfo.blockVolumeMapper != nil { + deviceMountPath, err := getDeviceMountPath(glblVolumeInfo) + if err != nil { + klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", glblVolumeInfo.volumeName) + continue + } + currentMountState := rc.actualStateOfWorld.GetDeviceMountState(glblVolumeInfo.volumeName) + if currentMountState == operationexecutor.DeviceNotMounted { + err = rc.actualStateOfWorld.MarkDeviceAsUncertain(glblVolumeInfo.volumeName, glblVolumeInfo.devicePath, deviceMountPath) + if err != nil { + klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", glblVolumeInfo.volumeName) + continue + } + klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", glblVolumeInfo.volumeName) + } + } + } + } + rc.skippedDuringReconstruction = make(map[v1.UniqueVolumeName]*globalVolumeInfo) +} + func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) { if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { //// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens @@ -418,21 +481,6 @@ func (rc *reconciler) syncStates(kubeletPodDir string) { rc.cleanupMounts(volume) continue } - if volumeInDSW { - // Some pod needs the volume. And it exists on disk. Some previous - // kubelet must have created the directory, therefore it must have - // reported the volume as in use. Mark the volume as in use also in - // this new kubelet so reconcile() calls SetUp and re-mounts the - // volume if it's necessary. - volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName) - klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) - continue - } - // There is no pod that uses the volume. - if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { - klog.InfoS("Volume is in pending operation, skip cleaning up mounts") - } - klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume) gvl := &globalVolumeInfo{ volumeName: reconstructedVolume.volumeName, volumeSpec: reconstructedVolume.volumeSpec, @@ -445,6 +493,22 @@ func (rc *reconciler) syncStates(kubeletPodDir string) { gvl = cachedInfo } gvl.addPodVolume(reconstructedVolume) + if volumeInDSW { + // Some pod needs the volume. And it exists on disk. Some previous + // kubelet must have created the directory, therefore it must have + // reported the volume as in use. Mark the volume as in use also in + // this new kubelet so reconcile() calls SetUp and re-mounts the + // volume if it's necessary. + volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName) + rc.skippedDuringReconstruction[reconstructedVolume.volumeName] = gvl + klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName) + continue + } + // There is no pod that uses the volume. + if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { + klog.InfoS("Volume is in pending operation, skip cleaning up mounts") + } + klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume) volumesNeedUpdate[reconstructedVolume.volumeName] = gvl } @@ -666,18 +730,7 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*gl continue } for _, volume := range gvl.podVolumes { - markVolumeOpts := operationexecutor.MarkVolumeOpts{ - PodName: volume.podName, - PodUID: types.UID(volume.podName), - VolumeName: volume.volumeName, - Mounter: volume.mounter, - BlockVolumeMapper: volume.blockVolumeMapper, - OuterVolumeSpecName: volume.outerVolumeSpecName, - VolumeGidVolume: volume.volumeGidValue, - VolumeSpec: volume.volumeSpec, - VolumeMountState: operationexecutor.VolumeMounted, - } - err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) + err = rc.markVolumeState(volume, operationexecutor.VolumeMounted) if err != nil { klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) continue @@ -702,6 +755,22 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*gl return nil } +func (rc *reconciler) markVolumeState(volume *reconstructedVolume, volumeState operationexecutor.VolumeMountState) error { + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: volume.podName, + PodUID: types.UID(volume.podName), + VolumeName: volume.volumeName, + Mounter: volume.mounter, + BlockVolumeMapper: volume.blockVolumeMapper, + OuterVolumeSpecName: volume.outerVolumeSpecName, + VolumeGidVolume: volume.volumeGidValue, + VolumeSpec: volume.volumeSpec, + VolumeMountState: operationexecutor.VolumeMounted, + } + err := rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) + return err +} + // getVolumesFromPodDir scans through the volumes directories under the given pod directory. // It returns a list of pod volume information including pod's uid, volume's plugin name, mount path, // and volume spec name. diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 3cbc4e61a57..4f0e8cb9e0a 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -215,6 +215,10 @@ type ActualStateOfWorldMounterUpdater interface { // MarkForInUseExpansionError marks the volume to have in-use error during expansion. // volume expansion must not be retried for this volume MarkForInUseExpansionError(volumeName v1.UniqueVolumeName) + + AddVolumeViaReconstruction(opts MarkVolumeOpts) error + + IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool } // ActualStateOfWorldAttacherUpdater defines a set of operations updating the diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 636aa4176d8..90404714b93 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -784,7 +784,8 @@ func (og *operationGenerator) markDeviceErrorState(volumeToMount VolumeToMount, func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, markOpts MarkVolumeOpts, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) { if volumetypes.IsOperationFinishedError(mountError) && - actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain { + actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain && + !actualStateOfWorld.IsVolumeReconstructed(volumeToMount.VolumeName, volumeToMount.PodName) { t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName) if t != nil { klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error()) @@ -799,7 +800,6 @@ func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", t).Error()) } } - } func (og *operationGenerator) GenerateUnmountVolumeFunc( From b8257e8c018d0e01f7aa037af2ba42594455a541 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Wed, 29 Jun 2022 15:02:38 -0400 Subject: [PATCH 3/6] Address review comments --- pkg/kubelet/volumemanager/reconciler/reconciler.go | 2 +- .../util/operationexecutor/operation_executor.go | 4 ++++ .../util/operationexecutor/operation_generator.go | 11 +++++++++-- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index a342d58ba9b..514f0139c15 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -765,7 +765,7 @@ func (rc *reconciler) markVolumeState(volume *reconstructedVolume, volumeState o OuterVolumeSpecName: volume.outerVolumeSpecName, VolumeGidVolume: volume.volumeGidValue, VolumeSpec: volume.volumeSpec, - VolumeMountState: operationexecutor.VolumeMounted, + VolumeMountState: volumeState, } err := rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) return err diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 4f0e8cb9e0a..541d86e2ace 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -216,8 +216,12 @@ type ActualStateOfWorldMounterUpdater interface { // volume expansion must not be retried for this volume MarkForInUseExpansionError(volumeName v1.UniqueVolumeName) + // AddVolumeViaReconstruction adds the volume to actual state of the world and also + // marks the volume as one found during reconstruction. AddVolumeViaReconstruction(opts MarkVolumeOpts) error + // IsVolumeReconstructed returns true if volume currently added to actual state of the world + // was found during reconstruction. IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool } diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 90404714b93..71ca9ce40d5 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -784,13 +784,20 @@ func (og *operationGenerator) markDeviceErrorState(volumeToMount VolumeToMount, func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, markOpts MarkVolumeOpts, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) { if volumetypes.IsOperationFinishedError(mountError) && - actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain && - !actualStateOfWorld.IsVolumeReconstructed(volumeToMount.VolumeName, volumeToMount.PodName) { + actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain { + // if volume was previously reconstructed we are not going to change its state as unmounted even + // if mount operation fails. + if actualStateOfWorld.IsVolumeReconstructed(volumeToMount.VolumeName, volumeToMount.PodName) { + klog.V(3).Infof("MountVolume.markVolumeErrorState leaving volume uncertain", "volumeName", volumeToMount.VolumeName) + return + } + t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName) if t != nil { klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error()) } return + } if volumetypes.IsUncertainProgressError(mountError) && From b455270f6e549eed1cff8159f672b955a6907fad Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Wed, 29 Jun 2022 16:17:50 -0400 Subject: [PATCH 4/6] Add unit test for verifying if processReconstructedVolumes works as expected --- .../volumemanager/reconciler/reconciler.go | 2 +- .../reconciler/reconciler_test.go | 73 +++++++++++++++++-- 2 files changed, 69 insertions(+), 6 deletions(-) diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 514f0139c15..a9b9a729298 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -290,7 +290,7 @@ func (rc *reconciler) processReconstructedVolumes() { } if uncertainVolumeCount > 0 { - // If the volume has device to mount, we mark its device as mounted. + // If the volume has device to mount, we mark its device as uncertain if glblVolumeInfo.deviceMounter != nil || glblVolumeInfo.blockVolumeMapper != nil { deviceMountPath, err := getDeviceMountPath(glblVolumeInfo) if err != nil { diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index fd9541d5e05..1a488080943 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -2239,10 +2239,12 @@ func getReconciler(kubeletDir string, t *testing.T, volumePaths []string) (Recon func TestSyncStates(t *testing.T) { tests := []struct { - name string - volumePaths []string - createMountPoint bool - verifyFunc func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error + name string + volumePaths []string + createMountPoint bool + addToDSOW bool + postSyncStatCallback func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error + verifyFunc func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error }{ { name: "when two pods are using same volume and both are deleted", @@ -2275,6 +2277,29 @@ func TestSyncStates(t *testing.T) { }) }, }, + { + name: "when volume exists in dsow, volume should be recorded in skipped during reconstruction", + volumePaths: []string{ + path.Join("pod1uid", "volumes", "fake-plugin", "volume-name"), + }, + createMountPoint: true, + addToDSOW: true, + postSyncStatCallback: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { + skippedVolumes := rcInstance.skippedDuringReconstruction + if len(skippedVolumes) != 1 { + return fmt.Errorf("expected 1 pods to in skippedDuringReconstruction got %d", len(skippedVolumes)) + } + rcInstance.processReconstructedVolumes() + return nil + }, + verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { + mountedPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes() + if len(mountedPods) != 1 { + return fmt.Errorf("expected 1 pods to in mounted volume list got %d", len(mountedPods)) + } + return nil + }, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { @@ -2284,6 +2309,28 @@ func TestSyncStates(t *testing.T) { } defer os.RemoveAll(tmpKubeletDir) + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: "volume-name", + }, + }, + }, + }, + }, + } + + volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} + podName := util.GetUniquePodName(pod) + // create kubelet pod directory tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods") os.MkdirAll(tmpKubeletPodDir, 0755) @@ -2301,11 +2348,27 @@ func TestSyncStates(t *testing.T) { rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths) rcInstance, _ := rc.(*reconciler) + + if tc.addToDSOW { + volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + if err != nil { + t.Fatalf("error adding volume %s to dsow: %v", volumeSpec.Name(), err) + } + rcInstance.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, "") + } + rcInstance.syncStates(tmpKubeletPodDir) + if tc.postSyncStatCallback != nil { + err := tc.postSyncStatCallback(rcInstance, fakePlugin) + if err != nil { + t.Errorf("test %s, postSyncStatCallback failed: %v", tc.name, err) + } + } + if err := tc.verifyFunc(rcInstance, fakePlugin); err != nil { t.Errorf("test %s failed: %v", tc.name, err) } }) } - } From 6d43345c069035f3590b81980c1b37266dd3ab60 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Thu, 30 Jun 2022 07:29:10 -0400 Subject: [PATCH 5/6] Remove volume from found during reconstruction if mounted Add unit tests for removing reconstructed volumes from ASOW --- .../cache/actual_state_of_world.go | 16 +- .../cache/actual_state_of_world_test.go | 152 ++++++++++++++++++ .../volumemanager/reconciler/reconciler.go | 19 ++- .../reconciler/reconciler_test.go | 10 +- 4 files changed, 190 insertions(+), 7 deletions(-) diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index c279b9f862a..33d58ddd019 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -227,7 +227,9 @@ type actualStateOfWorld struct { // state by default. // The key in this map is the name of the volume and the value is an object // containing more information about the attached volume. - attachedVolumes map[v1.UniqueVolumeName]attachedVolume + attachedVolumes map[v1.UniqueVolumeName]attachedVolume + // foundDuringReconstruction is a map of volumes which were discovered + // from kubelet root directory when kubelet was restarted. foundDuringReconstruction map[v1.UniqueVolumeName]map[volumetypes.UniquePodName]types.UID // volumePluginMgr is the volume plugin manager used to create volume @@ -366,14 +368,15 @@ func (asw *actualStateOfWorld) AddVolumeViaReconstruction(opts operationexecutor } func (asw *actualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { - asw.RLock() - defer asw.RUnlock() - volumeState := asw.GetVolumeMountState(volumeName, podName) + // only uncertain volumes are reconstructed if volumeState != operationexecutor.VolumeMountUncertain { return false } + + asw.RLock() + defer asw.RUnlock() podMap, ok := asw.foundDuringReconstruction[volumeName] if !ok { return false @@ -571,6 +574,11 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M // If pod exists, reset remountRequired value podObj.remountRequired = false podObj.volumeMountStateForPod = markVolumeOpts.VolumeMountState + + // if volume is mounted successfully, then it should be removed from foundDuringReconstruction map + if markVolumeOpts.VolumeMountState == operationexecutor.VolumeMounted { + delete(asw.foundDuringReconstruction[volumeName], podName) + } if mounter != nil { // The mounter stored in the object may have old information, // use the newest one. diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go index cd9fe6c52c8..076157357d5 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go @@ -17,7 +17,9 @@ limitations under the License. package cache import ( + "fmt" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" "testing" "github.com/stretchr/testify/require" @@ -458,6 +460,149 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) { verifyVolumeMountedElsewhere(t, podName2, generatedVolumeName2, true /*expectedMountedElsewhere */, asw) } +func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) { + tests := []struct { + name string + opCallback func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error + verifyCallback func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error + }{ + { + name: "marking volume mounted should remove volume from found during reconstruction", + opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error { + volumeOpts.VolumeMountState = operationexecutor.VolumeMounted + return asw.MarkVolumeAsMounted(volumeOpts) + }, + verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error { + ok := asw.IsVolumeReconstructed(volumeOpts.VolumeName, volumeOpts.PodName) + if ok { + return fmt.Errorf("found unexpected volume in reconstructed volume list") + } + return nil + }, + }, + { + name: "removing volume from pod should remove volume from found during reconstruction", + opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error { + return asw.MarkVolumeAsUnmounted(volumeOpts.PodName, volumeOpts.VolumeName) + }, + verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error { + ok := asw.IsVolumeReconstructed(volumeOpts.VolumeName, volumeOpts.PodName) + if ok { + return fmt.Errorf("found unexpected volume in reconstructed volume list") + } + return nil + }, + }, + { + name: "removing volume entirely from ASOW should remove volume from found during reconstruction", + opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error { + err := asw.MarkVolumeAsUnmounted(volumeOpts.PodName, volumeOpts.VolumeName) + if err != nil { + return err + } + asw.MarkVolumeAsDetached(volumeOpts.VolumeName, "") + return nil + }, + verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error { + ok := asw.IsVolumeReconstructed(volumeOpts.VolumeName, volumeOpts.PodName) + if ok { + return fmt.Errorf("found unexpected volume in reconstructed volume list") + } + aswInstance, _ := asw.(*actualStateOfWorld) + _, found := aswInstance.foundDuringReconstruction[volumeOpts.VolumeName] + if found { + return fmt.Errorf("found unexpected volume in reconstructed map") + } + return nil + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + volumePluginMgr, plugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr) + devicePath := "fake/device/path" + + pod1 := getTestPod("pod1", "pod1uid", "volume-name-1", "fake-device1") + volumeSpec1 := &volume.Spec{Volume: &pod1.Spec.Volumes[0]} + generatedVolumeName1, err := util.GetUniqueVolumeNameFromSpec( + plugin, volumeSpec1) + require.NoError(t, err) + + err = asw.MarkVolumeAsAttached(generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath) + if err != nil { + t.Fatalf("MarkVolumeAsAttached failed. Expected: Actual: <%v>", err) + } + podName1 := util.GetUniquePodName(pod1) + + mounter1, err := plugin.NewMounter(volumeSpec1, pod1, volume.VolumeOptions{}) + if err != nil { + t.Fatalf("NewMounter failed. Expected: Actual: <%v>", err) + } + + mapper1, err := plugin.NewBlockVolumeMapper(volumeSpec1, pod1, volume.VolumeOptions{}) + if err != nil { + t.Fatalf("NewBlockVolumeMapper failed. Expected: Actual: <%v>", err) + } + + markVolumeOpts1 := operationexecutor.MarkVolumeOpts{ + PodName: podName1, + PodUID: pod1.UID, + VolumeName: generatedVolumeName1, + Mounter: mounter1, + BlockVolumeMapper: mapper1, + OuterVolumeSpecName: volumeSpec1.Name(), + VolumeSpec: volumeSpec1, + VolumeMountState: operationexecutor.VolumeMountUncertain, + } + err = asw.AddVolumeViaReconstruction(markVolumeOpts1) + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + // make sure state is as we expect it to be + verifyVolumeExistsAsw(t, generatedVolumeName1, true /* shouldExist */, asw) + verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName1, asw) + verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName1, asw) + verifyVolumeExistsWithSpecNameInVolumeAsw(t, podName1, volumeSpec1.Name(), asw) + verifyVolumeSpecNameInVolumeAsw(t, podName1, []*volume.Spec{volumeSpec1}, asw) + verifyVolumeFoundInReconstruction(t, podName1, generatedVolumeName1, asw) + + if tc.opCallback != nil { + err = tc.opCallback(asw, markVolumeOpts1) + if err != nil { + t.Fatalf("for test %s: %v", tc.name, err) + } + } + err = tc.verifyCallback(asw, markVolumeOpts1) + if err != nil { + t.Fatalf("for test %s verification failed: %v", tc.name, err) + } + }) + } +} + +func getTestPod(podName, podUID, outerVolumeName, pdName string) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + UID: types.UID(podUID), + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: outerVolumeName, + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: pdName, + }, + }, + }, + }, + }, + } + return pod +} + // Calls AddPodToVolume() to add pod to empty data struct // Verifies call fails with "volume does not exist" error. func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) { @@ -873,3 +1018,10 @@ func verifyVolumeSpecNameInVolumeAsw( } } } + +func verifyVolumeFoundInReconstruction(t *testing.T, podToCheck volumetypes.UniquePodName, volumeToCheck v1.UniqueVolumeName, asw ActualStateOfWorld) { + isRecontructed := asw.IsVolumeReconstructed(volumeToCheck, podToCheck) + if !isRecontructed { + t.Fatalf("ASW IsVolumeReconstructed result invalid. expected Actual ") + } +} diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index a9b9a729298..fe4c1c0ace5 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -178,6 +178,10 @@ func (rc *reconciler) reconcile() { // Ensure devices that should be detached/unmounted are detached/unmounted. rc.unmountDetachDevices() + // After running the above operations if skippedDuringReconstruction is not empty + // then ensure that all volumes which were discovered and skipped during reconstruction + // are added to actualStateOfWorld in uncertain state. + // This should be called only ONCE after reconstruction. if len(rc.skippedDuringReconstruction) > 0 { rc.processReconstructedVolumes() } @@ -279,7 +283,18 @@ func (rc *reconciler) processReconstructedVolumes() { volumeNotMounted := rc.actualStateOfWorld.PodRemovedFromVolume(podName, volume.volumeName) // if volume is not mounted then lets mark volume mounted in uncertain state in ASOW if volumeNotMounted { - err := rc.markVolumeState(volume, operationexecutor.VolumeMountUncertain) + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: volume.podName, + PodUID: types.UID(volume.podName), + VolumeName: volume.volumeName, + Mounter: volume.mounter, + BlockVolumeMapper: volume.blockVolumeMapper, + OuterVolumeSpecName: volume.outerVolumeSpecName, + VolumeGidVolume: volume.volumeGidValue, + VolumeSpec: volume.volumeSpec, + VolumeMountState: operationexecutor.VolumeMountUncertain, + } + err := rc.actualStateOfWorld.AddVolumeViaReconstruction(markVolumeOpts) uncertainVolumeCount += 1 if err != nil { klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) @@ -429,6 +444,8 @@ type reconstructedVolume struct { blockVolumeMapper volumepkg.BlockVolumeMapper } +// globalVolumeInfo stores reconstructed volume information +// for each pod that was using that volume. type globalVolumeInfo struct { volumeName v1.UniqueVolumeName volumeSpec *volumepkg.Spec diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 1a488080943..c76751edafa 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -19,14 +19,15 @@ package reconciler import ( "crypto/md5" "fmt" - csitrans "k8s.io/csi-translation-lib" - "k8s.io/kubernetes/pkg/volume/csimigration" "os" "path" "path/filepath" "testing" "time" + csitrans "k8s.io/csi-translation-lib" + "k8s.io/kubernetes/pkg/volume/csimigration" + "github.com/stretchr/testify/assert" "k8s.io/mount-utils" @@ -2297,6 +2298,11 @@ func TestSyncStates(t *testing.T) { if len(mountedPods) != 1 { return fmt.Errorf("expected 1 pods to in mounted volume list got %d", len(mountedPods)) } + mountedPodVolume := mountedPods[0] + addedViaReconstruction := rcInstance.actualStateOfWorld.IsVolumeReconstructed(mountedPodVolume.VolumeName, mountedPodVolume.PodName) + if !addedViaReconstruction { + return fmt.Errorf("expected volume %s to be marked as added via reconstruction", mountedPodVolume.VolumeName) + } return nil }, }, From 835e8ccc76577b24376dee8b01b10dc2e273c969 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Wed, 13 Jul 2022 13:23:47 -0400 Subject: [PATCH 6/6] Use CheckAndMarkAsUncertainViaReconstruction for uncertain volumes Also only remove volumes from skippedDuringReconstruction only if volume was marked as attached. --- .../cache/actual_state_of_world.go | 95 +++++++++++++--- .../cache/actual_state_of_world_test.go | 4 +- .../volumemanager/reconciler/reconciler.go | 52 ++++----- .../reconciler/reconciler_test.go | 103 +++++++++++++----- pkg/volume/testing/testing.go | 4 +- .../operationexecutor/operation_executor.go | 15 ++- .../operationexecutor/operation_generator.go | 2 +- 7 files changed, 197 insertions(+), 78 deletions(-) diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 33d58ddd019..40c7d943c7e 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -350,23 +350,6 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached( asw.DeleteVolume(volumeName) } -func (asw *actualStateOfWorld) AddVolumeViaReconstruction(opts operationexecutor.MarkVolumeOpts) error { - err := asw.MarkVolumeAsMounted(opts) - if err != nil { - return err - } - asw.Lock() - defer asw.Unlock() - - podMap, ok := asw.foundDuringReconstruction[opts.VolumeName] - if !ok { - podMap = map[volumetypes.UniquePodName]types.UID{} - } - podMap[opts.PodName] = opts.PodUID - asw.foundDuringReconstruction[opts.VolumeName] = podMap - return nil -} - func (asw *actualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { volumeState := asw.GetVolumeMountState(volumeName, podName) @@ -385,6 +368,84 @@ func (asw *actualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeN return foundPod } +func (asw *actualStateOfWorld) CheckAndMarkVolumeAsUncertainViaReconstruction(opts operationexecutor.MarkVolumeOpts) (bool, error) { + asw.Lock() + defer asw.Unlock() + + volumeObj, volumeExists := asw.attachedVolumes[opts.VolumeName] + if !volumeExists { + return false, nil + } + + podObj, podExists := volumeObj.mountedPods[opts.PodName] + if podExists { + // if volume mount was uncertain we should keep trying to unmount the volume + if podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain { + return false, nil + } + if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted { + return false, nil + } + } + + podName := opts.PodName + podUID := opts.PodUID + volumeName := opts.VolumeName + mounter := opts.Mounter + blockVolumeMapper := opts.BlockVolumeMapper + outerVolumeSpecName := opts.OuterVolumeSpecName + volumeGidValue := opts.VolumeGidVolume + volumeSpec := opts.VolumeSpec + + podObj = mountedPod{ + podName: podName, + podUID: podUID, + mounter: mounter, + blockVolumeMapper: blockVolumeMapper, + outerVolumeSpecName: outerVolumeSpecName, + volumeGidValue: volumeGidValue, + volumeSpec: volumeSpec, + remountRequired: false, + volumeMountStateForPod: operationexecutor.VolumeMountUncertain, + } + + if mounter != nil { + // The mounter stored in the object may have old information, + // use the newest one. + podObj.mounter = mounter + } + + asw.attachedVolumes[volumeName].mountedPods[podName] = podObj + + podMap, ok := asw.foundDuringReconstruction[opts.VolumeName] + if !ok { + podMap = map[volumetypes.UniquePodName]types.UID{} + } + podMap[opts.PodName] = opts.PodUID + asw.foundDuringReconstruction[opts.VolumeName] = podMap + return true, nil +} + +func (asw *actualStateOfWorld) CheckAndMarkDeviceUncertainViaReconstruction(volumeName v1.UniqueVolumeName, deviceMountPath string) bool { + asw.Lock() + defer asw.Unlock() + + volumeObj, volumeExists := asw.attachedVolumes[volumeName] + // CheckAndMarkDeviceUncertainViaReconstruction requires volume to be marked as attached, so if + // volume does not exist in ASOW or is in any state other than DeviceNotMounted we should return + if !volumeExists || volumeObj.deviceMountState != operationexecutor.DeviceNotMounted { + return false + } + + volumeObj.deviceMountState = operationexecutor.DeviceMountUncertain + // we are only changing deviceMountPath because devicePath at at this stage is + // determined from node object. + volumeObj.deviceMountPath = deviceMountPath + asw.attachedVolumes[volumeName] = volumeObj + return true + +} + func (asw *actualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts operationexecutor.MarkVolumeOpts) error { return asw.AddPodToVolume(markVolumeOpts) } diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go index 076157357d5..019cfabf342 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go @@ -460,6 +460,8 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) { verifyVolumeMountedElsewhere(t, podName2, generatedVolumeName2, true /*expectedMountedElsewhere */, asw) } +// Test if volumes that were recorded to be read from disk during reconstruction +// are handled correctly by the ASOW. func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) { tests := []struct { name string @@ -555,7 +557,7 @@ func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) { VolumeSpec: volumeSpec1, VolumeMountState: operationexecutor.VolumeMountUncertain, } - err = asw.AddVolumeViaReconstruction(markVolumeOpts1) + _, err = asw.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts1) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index fe4c1c0ace5..08efe0fe0cb 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -181,7 +181,6 @@ func (rc *reconciler) reconcile() { // After running the above operations if skippedDuringReconstruction is not empty // then ensure that all volumes which were discovered and skipped during reconstruction // are added to actualStateOfWorld in uncertain state. - // This should be called only ONCE after reconstruction. if len(rc.skippedDuringReconstruction) > 0 { rc.processReconstructedVolumes() } @@ -265,42 +264,43 @@ func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, po // But if mount operation fails for some reason then we still need to mark the volume as uncertain // and wait for the next reconciliation loop to deal with it. func (rc *reconciler) processReconstructedVolumes() { - if rc.kubeClient != nil { - rc.updateDevicePath(rc.skippedDuringReconstruction) - } for volumeName, glblVolumeInfo := range rc.skippedDuringReconstruction { // check if volume is marked as attached to the node // for now lets only process volumes which are at least known as attached to the node // this should help with most volume types (including secret, configmap etc) if !rc.actualStateOfWorld.VolumeExists(volumeName) { klog.V(4).InfoS("Volume is not marked as attached to the node. Skipping processing of the volume", "volumeName", volumeName) - delete(rc.skippedDuringReconstruction, volumeName) continue } uncertainVolumeCount := 0 + // only delete volumes which were marked as attached here. + // This should ensure that - we will wait for volumes which were not marked as attached + // before adding them in uncertain state during reconstruction. + delete(rc.skippedDuringReconstruction, volumeName) for podName, volume := range glblVolumeInfo.podVolumes { - volumeNotMounted := rc.actualStateOfWorld.PodRemovedFromVolume(podName, volume.volumeName) + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: volume.podName, + PodUID: types.UID(podName), + VolumeName: volume.volumeName, + Mounter: volume.mounter, + BlockVolumeMapper: volume.blockVolumeMapper, + OuterVolumeSpecName: volume.outerVolumeSpecName, + VolumeGidVolume: volume.volumeGidValue, + VolumeSpec: volume.volumeSpec, + VolumeMountState: operationexecutor.VolumeMountUncertain, + } + + volumeAdded, err := rc.actualStateOfWorld.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts) + // if volume is not mounted then lets mark volume mounted in uncertain state in ASOW - if volumeNotMounted { - markVolumeOpts := operationexecutor.MarkVolumeOpts{ - PodName: volume.podName, - PodUID: types.UID(volume.podName), - VolumeName: volume.volumeName, - Mounter: volume.mounter, - BlockVolumeMapper: volume.blockVolumeMapper, - OuterVolumeSpecName: volume.outerVolumeSpecName, - VolumeGidVolume: volume.volumeGidValue, - VolumeSpec: volume.volumeSpec, - VolumeMountState: operationexecutor.VolumeMountUncertain, - } - err := rc.actualStateOfWorld.AddVolumeViaReconstruction(markVolumeOpts) + if volumeAdded { uncertainVolumeCount += 1 if err != nil { klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) continue } - klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) + klog.V(4).InfoS("Volume is marked as mounted in uncertain state and added to the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) } } @@ -312,19 +312,13 @@ func (rc *reconciler) processReconstructedVolumes() { klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", glblVolumeInfo.volumeName) continue } - currentMountState := rc.actualStateOfWorld.GetDeviceMountState(glblVolumeInfo.volumeName) - if currentMountState == operationexecutor.DeviceNotMounted { - err = rc.actualStateOfWorld.MarkDeviceAsUncertain(glblVolumeInfo.volumeName, glblVolumeInfo.devicePath, deviceMountPath) - if err != nil { - klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", glblVolumeInfo.volumeName) - continue - } - klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", glblVolumeInfo.volumeName) + deviceMounted := rc.actualStateOfWorld.CheckAndMarkDeviceUncertainViaReconstruction(glblVolumeInfo.volumeName, deviceMountPath) + if !deviceMounted { + klog.V(3).InfoS("Could not mark device as mounted in uncertain state", "volumeName", glblVolumeInfo.volumeName) } } } } - rc.skippedDuringReconstruction = make(map[v1.UniqueVolumeName]*globalVolumeInfo) } func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) { diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index c76751edafa..c69004e03cc 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -2202,6 +2202,28 @@ func getFakeNode() *v1.Node { } } +func getInlineFakePod(podName, podUUID, outerName, innerName string) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + UID: k8stypes.UID(podUUID), + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: outerName, + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: innerName, + }, + }, + }, + }, + }, + } + return pod +} + func getReconciler(kubeletDir string, t *testing.T, volumePaths []string) (Reconciler, *volumetesting.FakeVolumePlugin) { node := getFakeNode() volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNodeAndRoot(t, node, kubeletDir) @@ -2239,11 +2261,23 @@ func getReconciler(kubeletDir string, t *testing.T, volumePaths []string) (Recon } func TestSyncStates(t *testing.T) { + type podInfo struct { + podName string + podUID string + outerVolumeName string + innerVolumeName string + } + defaultPodInfo := podInfo{ + podName: "pod1", + podUID: "pod1uid", + outerVolumeName: "volume-name", + innerVolumeName: "volume-name", + } tests := []struct { name string volumePaths []string createMountPoint bool - addToDSOW bool + podInfos []podInfo postSyncStatCallback func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error verifyFunc func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error }{ @@ -2254,6 +2288,7 @@ func TestSyncStates(t *testing.T) { path.Join("pod2", "volumes", "fake-plugin", "pvc-abcdef"), }, createMountPoint: true, + podInfos: []podInfo{}, verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes() if len(mountedPods) != 2 { @@ -2262,12 +2297,33 @@ func TestSyncStates(t *testing.T) { return nil }, }, + { + name: "when two pods are using same volume and one of them is deleted", + volumePaths: []string{ + path.Join("pod1uid", "volumes", "fake-plugin", "volume-name"), + path.Join("pod2uid", "volumes", "fake-plugin", "volume-name"), + }, + createMountPoint: true, + podInfos: []podInfo{defaultPodInfo}, + verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { + // for pod that is deleted, volume is considered as mounted + mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes() + if len(mountedPods) != 1 { + return fmt.Errorf("expected 1 pods to in asw got %d", len(mountedPods)) + } + if types.UniquePodName("pod2uid") != mountedPods[0].PodName { + return fmt.Errorf("expected mounted pod to be %s got %s", "pod2uid", mountedPods[0].PodName) + } + return nil + }, + }, { name: "when reconstruction fails for a volume, volumes should be cleaned up", volumePaths: []string{ path.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"), }, createMountPoint: false, + podInfos: []podInfo{}, verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { return retryWithExponentialBackOff(reconcilerSyncWaitDuration, func() (bool, error) { err := volumetesting.VerifyTearDownCallCount(1, fakePlugin) @@ -2284,7 +2340,7 @@ func TestSyncStates(t *testing.T) { path.Join("pod1uid", "volumes", "fake-plugin", "volume-name"), }, createMountPoint: true, - addToDSOW: true, + podInfos: []podInfo{defaultPodInfo}, postSyncStatCallback: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { skippedVolumes := rcInstance.skippedDuringReconstruction if len(skippedVolumes) != 1 { @@ -2303,6 +2359,22 @@ func TestSyncStates(t *testing.T) { if !addedViaReconstruction { return fmt.Errorf("expected volume %s to be marked as added via reconstruction", mountedPodVolume.VolumeName) } + + // check device mount state + attachedVolumes := rcInstance.actualStateOfWorld.GetAttachedVolumes() + if len(attachedVolumes) != 1 { + return fmt.Errorf("expected 1 volume to be unmounted, got %d", len(attachedVolumes)) + } + firstAttachedVolume := attachedVolumes[0] + if !firstAttachedVolume.DeviceMayBeMounted() { + return fmt.Errorf("expected %s volume to be mounted in uncertain state", firstAttachedVolume.VolumeName) + } + + // also skippedVolumes map should be empty + skippedVolumes := rcInstance.skippedDuringReconstruction + if len(skippedVolumes) > 0 { + return fmt.Errorf("expected 0 pods in skipped volumes found %d", len(skippedVolumes)) + } return nil }, }, @@ -2315,28 +2387,6 @@ func TestSyncStates(t *testing.T) { } defer os.RemoveAll(tmpKubeletDir) - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - UID: "pod1uid", - }, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: "volume-name", - VolumeSource: v1.VolumeSource{ - GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ - PDName: "volume-name", - }, - }, - }, - }, - }, - } - - volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} - podName := util.GetUniquePodName(pod) - // create kubelet pod directory tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods") os.MkdirAll(tmpKubeletPodDir, 0755) @@ -2355,7 +2405,10 @@ func TestSyncStates(t *testing.T) { rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths) rcInstance, _ := rc.(*reconciler) - if tc.addToDSOW { + for _, tpodInfo := range tc.podInfos { + pod := getInlineFakePod(tpodInfo.podName, tpodInfo.podUID, tpodInfo.outerVolumeName, tpodInfo.innerVolumeName) + volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} + podName := util.GetUniquePodName(pod) volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume( podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) if err != nil { diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 8484aa31c76..344ac32023c 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -1660,8 +1660,8 @@ func GetTestKubeletVolumePluginMgrWithNode(t *testing.T, node *v1.Node) (*volume return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin) } -func GetTestKubeletVolumePluginMgrWithNodeAndRoot(t *testing.T, node *v1.Node, rootDir string) (*VolumePluginMgr, *FakeVolumePlugin) { - plugins := ProbeVolumePlugins(VolumeConfig{}) +func GetTestKubeletVolumePluginMgrWithNodeAndRoot(t *testing.T, node *v1.Node, rootDir string) (*volume.VolumePluginMgr, *FakeVolumePlugin) { + plugins := ProbeVolumePlugins(volume.VolumeConfig{}) v := NewFakeKubeletVolumeHost( t, rootDir, /* rootDir */ diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 541d86e2ace..13079bf1581 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -216,9 +216,18 @@ type ActualStateOfWorldMounterUpdater interface { // volume expansion must not be retried for this volume MarkForInUseExpansionError(volumeName v1.UniqueVolumeName) - // AddVolumeViaReconstruction adds the volume to actual state of the world and also - // marks the volume as one found during reconstruction. - AddVolumeViaReconstruction(opts MarkVolumeOpts) error + // CheckAndMarkVolumeAsUncertainViaReconstruction only adds volume to actual state of the world + // if volume was not already there. This avoid overwriting in any previously stored + // state. It returns error if there was an error adding the volume to ASOW. + // It returns true, if this operation resulted in volume being added to ASOW + // otherwise it returns false. + CheckAndMarkVolumeAsUncertainViaReconstruction(opts MarkVolumeOpts) (bool, error) + + // CheckAndMarkDeviceUncertainViaReconstruction only adds device to actual state of the world + // if device was not already there. This avoids overwriting in any previously stored + // state. We only supply deviceMountPath because devicePath is already determined from + // VerifyControllerAttachedVolume function. + CheckAndMarkDeviceUncertainViaReconstruction(volumeName v1.UniqueVolumeName, deviceMountPath string) bool // IsVolumeReconstructed returns true if volume currently added to actual state of the world // was found during reconstruction. diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 71ca9ce40d5..fe04932c4dd 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -788,7 +788,7 @@ func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, // if volume was previously reconstructed we are not going to change its state as unmounted even // if mount operation fails. if actualStateOfWorld.IsVolumeReconstructed(volumeToMount.VolumeName, volumeToMount.PodName) { - klog.V(3).Infof("MountVolume.markVolumeErrorState leaving volume uncertain", "volumeName", volumeToMount.VolumeName) + klog.V(3).InfoS("MountVolume.markVolumeErrorState leaving volume uncertain", "volumeName", volumeToMount.VolumeName) return }