diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index c279b9f862a..33d58ddd019 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -227,7 +227,9 @@ type actualStateOfWorld struct { // state by default. // The key in this map is the name of the volume and the value is an object // containing more information about the attached volume. - attachedVolumes map[v1.UniqueVolumeName]attachedVolume + attachedVolumes map[v1.UniqueVolumeName]attachedVolume + // foundDuringReconstruction is a map of volumes which were discovered + // from kubelet root directory when kubelet was restarted. foundDuringReconstruction map[v1.UniqueVolumeName]map[volumetypes.UniquePodName]types.UID // volumePluginMgr is the volume plugin manager used to create volume @@ -366,14 +368,15 @@ func (asw *actualStateOfWorld) AddVolumeViaReconstruction(opts operationexecutor } func (asw *actualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { - asw.RLock() - defer asw.RUnlock() - volumeState := asw.GetVolumeMountState(volumeName, podName) + // only uncertain volumes are reconstructed if volumeState != operationexecutor.VolumeMountUncertain { return false } + + asw.RLock() + defer asw.RUnlock() podMap, ok := asw.foundDuringReconstruction[volumeName] if !ok { return false @@ -571,6 +574,11 @@ func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.M // If pod exists, reset remountRequired value podObj.remountRequired = false podObj.volumeMountStateForPod = markVolumeOpts.VolumeMountState + + // if volume is mounted successfully, then it should be removed from foundDuringReconstruction map + if markVolumeOpts.VolumeMountState == operationexecutor.VolumeMounted { + delete(asw.foundDuringReconstruction[volumeName], podName) + } if mounter != nil { // The mounter stored in the object may have old information, // use the newest one. diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go index cd9fe6c52c8..076157357d5 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go @@ -17,7 +17,9 @@ limitations under the License. package cache import ( + "fmt" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" "testing" "github.com/stretchr/testify/require" @@ -458,6 +460,149 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) { verifyVolumeMountedElsewhere(t, podName2, generatedVolumeName2, true /*expectedMountedElsewhere */, asw) } +func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) { + tests := []struct { + name string + opCallback func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error + verifyCallback func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error + }{ + { + name: "marking volume mounted should remove volume from found during reconstruction", + opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error { + volumeOpts.VolumeMountState = operationexecutor.VolumeMounted + return asw.MarkVolumeAsMounted(volumeOpts) + }, + verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error { + ok := asw.IsVolumeReconstructed(volumeOpts.VolumeName, volumeOpts.PodName) + if ok { + return fmt.Errorf("found unexpected volume in reconstructed volume list") + } + return nil + }, + }, + { + name: "removing volume from pod should remove volume from found during reconstruction", + opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error { + return asw.MarkVolumeAsUnmounted(volumeOpts.PodName, volumeOpts.VolumeName) + }, + verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error { + ok := asw.IsVolumeReconstructed(volumeOpts.VolumeName, volumeOpts.PodName) + if ok { + return fmt.Errorf("found unexpected volume in reconstructed volume list") + } + return nil + }, + }, + { + name: "removing volume entirely from ASOW should remove volume from found during reconstruction", + opCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error { + err := asw.MarkVolumeAsUnmounted(volumeOpts.PodName, volumeOpts.VolumeName) + if err != nil { + return err + } + asw.MarkVolumeAsDetached(volumeOpts.VolumeName, "") + return nil + }, + verifyCallback: func(asw ActualStateOfWorld, volumeOpts operationexecutor.MarkVolumeOpts) error { + ok := asw.IsVolumeReconstructed(volumeOpts.VolumeName, volumeOpts.PodName) + if ok { + return fmt.Errorf("found unexpected volume in reconstructed volume list") + } + aswInstance, _ := asw.(*actualStateOfWorld) + _, found := aswInstance.foundDuringReconstruction[volumeOpts.VolumeName] + if found { + return fmt.Errorf("found unexpected volume in reconstructed map") + } + return nil + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + volumePluginMgr, plugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr) + devicePath := "fake/device/path" + + pod1 := getTestPod("pod1", "pod1uid", "volume-name-1", "fake-device1") + volumeSpec1 := &volume.Spec{Volume: &pod1.Spec.Volumes[0]} + generatedVolumeName1, err := util.GetUniqueVolumeNameFromSpec( + plugin, volumeSpec1) + require.NoError(t, err) + + err = asw.MarkVolumeAsAttached(generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath) + if err != nil { + t.Fatalf("MarkVolumeAsAttached failed. Expected: Actual: <%v>", err) + } + podName1 := util.GetUniquePodName(pod1) + + mounter1, err := plugin.NewMounter(volumeSpec1, pod1, volume.VolumeOptions{}) + if err != nil { + t.Fatalf("NewMounter failed. Expected: Actual: <%v>", err) + } + + mapper1, err := plugin.NewBlockVolumeMapper(volumeSpec1, pod1, volume.VolumeOptions{}) + if err != nil { + t.Fatalf("NewBlockVolumeMapper failed. Expected: Actual: <%v>", err) + } + + markVolumeOpts1 := operationexecutor.MarkVolumeOpts{ + PodName: podName1, + PodUID: pod1.UID, + VolumeName: generatedVolumeName1, + Mounter: mounter1, + BlockVolumeMapper: mapper1, + OuterVolumeSpecName: volumeSpec1.Name(), + VolumeSpec: volumeSpec1, + VolumeMountState: operationexecutor.VolumeMountUncertain, + } + err = asw.AddVolumeViaReconstruction(markVolumeOpts1) + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + // make sure state is as we expect it to be + verifyVolumeExistsAsw(t, generatedVolumeName1, true /* shouldExist */, asw) + verifyVolumeDoesntExistInUnmountedVolumes(t, generatedVolumeName1, asw) + verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName1, asw) + verifyVolumeExistsWithSpecNameInVolumeAsw(t, podName1, volumeSpec1.Name(), asw) + verifyVolumeSpecNameInVolumeAsw(t, podName1, []*volume.Spec{volumeSpec1}, asw) + verifyVolumeFoundInReconstruction(t, podName1, generatedVolumeName1, asw) + + if tc.opCallback != nil { + err = tc.opCallback(asw, markVolumeOpts1) + if err != nil { + t.Fatalf("for test %s: %v", tc.name, err) + } + } + err = tc.verifyCallback(asw, markVolumeOpts1) + if err != nil { + t.Fatalf("for test %s verification failed: %v", tc.name, err) + } + }) + } +} + +func getTestPod(podName, podUID, outerVolumeName, pdName string) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + UID: types.UID(podUID), + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: outerVolumeName, + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: pdName, + }, + }, + }, + }, + }, + } + return pod +} + // Calls AddPodToVolume() to add pod to empty data struct // Verifies call fails with "volume does not exist" error. func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) { @@ -873,3 +1018,10 @@ func verifyVolumeSpecNameInVolumeAsw( } } } + +func verifyVolumeFoundInReconstruction(t *testing.T, podToCheck volumetypes.UniquePodName, volumeToCheck v1.UniqueVolumeName, asw ActualStateOfWorld) { + isRecontructed := asw.IsVolumeReconstructed(volumeToCheck, podToCheck) + if !isRecontructed { + t.Fatalf("ASW IsVolumeReconstructed result invalid. expected Actual ") + } +} diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index a9b9a729298..fe4c1c0ace5 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -178,6 +178,10 @@ func (rc *reconciler) reconcile() { // Ensure devices that should be detached/unmounted are detached/unmounted. rc.unmountDetachDevices() + // After running the above operations if skippedDuringReconstruction is not empty + // then ensure that all volumes which were discovered and skipped during reconstruction + // are added to actualStateOfWorld in uncertain state. + // This should be called only ONCE after reconstruction. if len(rc.skippedDuringReconstruction) > 0 { rc.processReconstructedVolumes() } @@ -279,7 +283,18 @@ func (rc *reconciler) processReconstructedVolumes() { volumeNotMounted := rc.actualStateOfWorld.PodRemovedFromVolume(podName, volume.volumeName) // if volume is not mounted then lets mark volume mounted in uncertain state in ASOW if volumeNotMounted { - err := rc.markVolumeState(volume, operationexecutor.VolumeMountUncertain) + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: volume.podName, + PodUID: types.UID(volume.podName), + VolumeName: volume.volumeName, + Mounter: volume.mounter, + BlockVolumeMapper: volume.blockVolumeMapper, + OuterVolumeSpecName: volume.outerVolumeSpecName, + VolumeGidVolume: volume.volumeGidValue, + VolumeSpec: volume.volumeSpec, + VolumeMountState: operationexecutor.VolumeMountUncertain, + } + err := rc.actualStateOfWorld.AddVolumeViaReconstruction(markVolumeOpts) uncertainVolumeCount += 1 if err != nil { klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) @@ -429,6 +444,8 @@ type reconstructedVolume struct { blockVolumeMapper volumepkg.BlockVolumeMapper } +// globalVolumeInfo stores reconstructed volume information +// for each pod that was using that volume. type globalVolumeInfo struct { volumeName v1.UniqueVolumeName volumeSpec *volumepkg.Spec diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 1a488080943..c76751edafa 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -19,14 +19,15 @@ package reconciler import ( "crypto/md5" "fmt" - csitrans "k8s.io/csi-translation-lib" - "k8s.io/kubernetes/pkg/volume/csimigration" "os" "path" "path/filepath" "testing" "time" + csitrans "k8s.io/csi-translation-lib" + "k8s.io/kubernetes/pkg/volume/csimigration" + "github.com/stretchr/testify/assert" "k8s.io/mount-utils" @@ -2297,6 +2298,11 @@ func TestSyncStates(t *testing.T) { if len(mountedPods) != 1 { return fmt.Errorf("expected 1 pods to in mounted volume list got %d", len(mountedPods)) } + mountedPodVolume := mountedPods[0] + addedViaReconstruction := rcInstance.actualStateOfWorld.IsVolumeReconstructed(mountedPodVolume.VolumeName, mountedPodVolume.PodName) + if !addedViaReconstruction { + return fmt.Errorf("expected volume %s to be marked as added via reconstruction", mountedPodVolume.VolumeName) + } return nil }, },