diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 33d58ddd019..40c7d943c7e 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -350,23 +350,6 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached( asw.DeleteVolume(volumeName) } -func (asw *actualStateOfWorld) AddVolumeViaReconstruction(opts operationexecutor.MarkVolumeOpts) error { - err := asw.MarkVolumeAsMounted(opts) - if err != nil { - return err - } - asw.Lock() - defer asw.Unlock() - - podMap, ok := asw.foundDuringReconstruction[opts.VolumeName] - if !ok { - podMap = map[volumetypes.UniquePodName]types.UID{} - } - podMap[opts.PodName] = opts.PodUID - asw.foundDuringReconstruction[opts.VolumeName] = podMap - return nil -} - func (asw *actualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { volumeState := asw.GetVolumeMountState(volumeName, podName) @@ -385,6 +368,84 @@ func (asw *actualStateOfWorld) IsVolumeReconstructed(volumeName v1.UniqueVolumeN return foundPod } +func (asw *actualStateOfWorld) CheckAndMarkVolumeAsUncertainViaReconstruction(opts operationexecutor.MarkVolumeOpts) (bool, error) { + asw.Lock() + defer asw.Unlock() + + volumeObj, volumeExists := asw.attachedVolumes[opts.VolumeName] + if !volumeExists { + return false, nil + } + + podObj, podExists := volumeObj.mountedPods[opts.PodName] + if podExists { + // if volume mount was uncertain we should keep trying to unmount the volume + if podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain { + return false, nil + } + if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted { + return false, nil + } + } + + podName := opts.PodName + podUID := opts.PodUID + volumeName := opts.VolumeName + mounter := opts.Mounter + blockVolumeMapper := opts.BlockVolumeMapper + outerVolumeSpecName := opts.OuterVolumeSpecName + volumeGidValue := opts.VolumeGidVolume + volumeSpec := opts.VolumeSpec + + podObj = mountedPod{ + podName: podName, + podUID: podUID, + mounter: mounter, + blockVolumeMapper: blockVolumeMapper, + outerVolumeSpecName: outerVolumeSpecName, + volumeGidValue: volumeGidValue, + volumeSpec: volumeSpec, + remountRequired: false, + volumeMountStateForPod: operationexecutor.VolumeMountUncertain, + } + + if mounter != nil { + // The mounter stored in the object may have old information, + // use the newest one. + podObj.mounter = mounter + } + + asw.attachedVolumes[volumeName].mountedPods[podName] = podObj + + podMap, ok := asw.foundDuringReconstruction[opts.VolumeName] + if !ok { + podMap = map[volumetypes.UniquePodName]types.UID{} + } + podMap[opts.PodName] = opts.PodUID + asw.foundDuringReconstruction[opts.VolumeName] = podMap + return true, nil +} + +func (asw *actualStateOfWorld) CheckAndMarkDeviceUncertainViaReconstruction(volumeName v1.UniqueVolumeName, deviceMountPath string) bool { + asw.Lock() + defer asw.Unlock() + + volumeObj, volumeExists := asw.attachedVolumes[volumeName] + // CheckAndMarkDeviceUncertainViaReconstruction requires volume to be marked as attached, so if + // volume does not exist in ASOW or is in any state other than DeviceNotMounted we should return + if !volumeExists || volumeObj.deviceMountState != operationexecutor.DeviceNotMounted { + return false + } + + volumeObj.deviceMountState = operationexecutor.DeviceMountUncertain + // we are only changing deviceMountPath because devicePath at at this stage is + // determined from node object. + volumeObj.deviceMountPath = deviceMountPath + asw.attachedVolumes[volumeName] = volumeObj + return true + +} + func (asw *actualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts operationexecutor.MarkVolumeOpts) error { return asw.AddPodToVolume(markVolumeOpts) } diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go index 076157357d5..019cfabf342 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go @@ -460,6 +460,8 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) { verifyVolumeMountedElsewhere(t, podName2, generatedVolumeName2, true /*expectedMountedElsewhere */, asw) } +// Test if volumes that were recorded to be read from disk during reconstruction +// are handled correctly by the ASOW. func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) { tests := []struct { name string @@ -555,7 +557,7 @@ func TestActualStateOfWorld_FoundDuringReconstruction(t *testing.T) { VolumeSpec: volumeSpec1, VolumeMountState: operationexecutor.VolumeMountUncertain, } - err = asw.AddVolumeViaReconstruction(markVolumeOpts1) + _, err = asw.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts1) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index fe4c1c0ace5..08efe0fe0cb 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -181,7 +181,6 @@ func (rc *reconciler) reconcile() { // After running the above operations if skippedDuringReconstruction is not empty // then ensure that all volumes which were discovered and skipped during reconstruction // are added to actualStateOfWorld in uncertain state. - // This should be called only ONCE after reconstruction. if len(rc.skippedDuringReconstruction) > 0 { rc.processReconstructedVolumes() } @@ -265,42 +264,43 @@ func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, po // But if mount operation fails for some reason then we still need to mark the volume as uncertain // and wait for the next reconciliation loop to deal with it. func (rc *reconciler) processReconstructedVolumes() { - if rc.kubeClient != nil { - rc.updateDevicePath(rc.skippedDuringReconstruction) - } for volumeName, glblVolumeInfo := range rc.skippedDuringReconstruction { // check if volume is marked as attached to the node // for now lets only process volumes which are at least known as attached to the node // this should help with most volume types (including secret, configmap etc) if !rc.actualStateOfWorld.VolumeExists(volumeName) { klog.V(4).InfoS("Volume is not marked as attached to the node. Skipping processing of the volume", "volumeName", volumeName) - delete(rc.skippedDuringReconstruction, volumeName) continue } uncertainVolumeCount := 0 + // only delete volumes which were marked as attached here. + // This should ensure that - we will wait for volumes which were not marked as attached + // before adding them in uncertain state during reconstruction. + delete(rc.skippedDuringReconstruction, volumeName) for podName, volume := range glblVolumeInfo.podVolumes { - volumeNotMounted := rc.actualStateOfWorld.PodRemovedFromVolume(podName, volume.volumeName) + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: volume.podName, + PodUID: types.UID(podName), + VolumeName: volume.volumeName, + Mounter: volume.mounter, + BlockVolumeMapper: volume.blockVolumeMapper, + OuterVolumeSpecName: volume.outerVolumeSpecName, + VolumeGidVolume: volume.volumeGidValue, + VolumeSpec: volume.volumeSpec, + VolumeMountState: operationexecutor.VolumeMountUncertain, + } + + volumeAdded, err := rc.actualStateOfWorld.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts) + // if volume is not mounted then lets mark volume mounted in uncertain state in ASOW - if volumeNotMounted { - markVolumeOpts := operationexecutor.MarkVolumeOpts{ - PodName: volume.podName, - PodUID: types.UID(volume.podName), - VolumeName: volume.volumeName, - Mounter: volume.mounter, - BlockVolumeMapper: volume.blockVolumeMapper, - OuterVolumeSpecName: volume.outerVolumeSpecName, - VolumeGidVolume: volume.volumeGidValue, - VolumeSpec: volume.volumeSpec, - VolumeMountState: operationexecutor.VolumeMountUncertain, - } - err := rc.actualStateOfWorld.AddVolumeViaReconstruction(markVolumeOpts) + if volumeAdded { uncertainVolumeCount += 1 if err != nil { klog.ErrorS(err, "Could not add pod to volume information to actual state of world", "pod", klog.KObj(volume.pod)) continue } - klog.V(4).InfoS("Volume is marked as mounted and added into the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) + klog.V(4).InfoS("Volume is marked as mounted in uncertain state and added to the actual state", "pod", klog.KObj(volume.pod), "podName", volume.podName, "volumeName", volume.volumeName) } } @@ -312,19 +312,13 @@ func (rc *reconciler) processReconstructedVolumes() { klog.ErrorS(err, "Could not find device mount path for volume", "volumeName", glblVolumeInfo.volumeName) continue } - currentMountState := rc.actualStateOfWorld.GetDeviceMountState(glblVolumeInfo.volumeName) - if currentMountState == operationexecutor.DeviceNotMounted { - err = rc.actualStateOfWorld.MarkDeviceAsUncertain(glblVolumeInfo.volumeName, glblVolumeInfo.devicePath, deviceMountPath) - if err != nil { - klog.ErrorS(err, "Could not mark device is mounted to actual state of world", "volume", glblVolumeInfo.volumeName) - continue - } - klog.V(4).InfoS("Volume is marked device as mounted and added into the actual state", "volumeName", glblVolumeInfo.volumeName) + deviceMounted := rc.actualStateOfWorld.CheckAndMarkDeviceUncertainViaReconstruction(glblVolumeInfo.volumeName, deviceMountPath) + if !deviceMounted { + klog.V(3).InfoS("Could not mark device as mounted in uncertain state", "volumeName", glblVolumeInfo.volumeName) } } } } - rc.skippedDuringReconstruction = make(map[v1.UniqueVolumeName]*globalVolumeInfo) } func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) { diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index c76751edafa..c69004e03cc 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -2202,6 +2202,28 @@ func getFakeNode() *v1.Node { } } +func getInlineFakePod(podName, podUUID, outerName, innerName string) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + UID: k8stypes.UID(podUUID), + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: outerName, + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: innerName, + }, + }, + }, + }, + }, + } + return pod +} + func getReconciler(kubeletDir string, t *testing.T, volumePaths []string) (Reconciler, *volumetesting.FakeVolumePlugin) { node := getFakeNode() volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNodeAndRoot(t, node, kubeletDir) @@ -2239,11 +2261,23 @@ func getReconciler(kubeletDir string, t *testing.T, volumePaths []string) (Recon } func TestSyncStates(t *testing.T) { + type podInfo struct { + podName string + podUID string + outerVolumeName string + innerVolumeName string + } + defaultPodInfo := podInfo{ + podName: "pod1", + podUID: "pod1uid", + outerVolumeName: "volume-name", + innerVolumeName: "volume-name", + } tests := []struct { name string volumePaths []string createMountPoint bool - addToDSOW bool + podInfos []podInfo postSyncStatCallback func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error verifyFunc func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error }{ @@ -2254,6 +2288,7 @@ func TestSyncStates(t *testing.T) { path.Join("pod2", "volumes", "fake-plugin", "pvc-abcdef"), }, createMountPoint: true, + podInfos: []podInfo{}, verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes() if len(mountedPods) != 2 { @@ -2262,12 +2297,33 @@ func TestSyncStates(t *testing.T) { return nil }, }, + { + name: "when two pods are using same volume and one of them is deleted", + volumePaths: []string{ + path.Join("pod1uid", "volumes", "fake-plugin", "volume-name"), + path.Join("pod2uid", "volumes", "fake-plugin", "volume-name"), + }, + createMountPoint: true, + podInfos: []podInfo{defaultPodInfo}, + verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { + // for pod that is deleted, volume is considered as mounted + mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes() + if len(mountedPods) != 1 { + return fmt.Errorf("expected 1 pods to in asw got %d", len(mountedPods)) + } + if types.UniquePodName("pod2uid") != mountedPods[0].PodName { + return fmt.Errorf("expected mounted pod to be %s got %s", "pod2uid", mountedPods[0].PodName) + } + return nil + }, + }, { name: "when reconstruction fails for a volume, volumes should be cleaned up", volumePaths: []string{ path.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"), }, createMountPoint: false, + podInfos: []podInfo{}, verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { return retryWithExponentialBackOff(reconcilerSyncWaitDuration, func() (bool, error) { err := volumetesting.VerifyTearDownCallCount(1, fakePlugin) @@ -2284,7 +2340,7 @@ func TestSyncStates(t *testing.T) { path.Join("pod1uid", "volumes", "fake-plugin", "volume-name"), }, createMountPoint: true, - addToDSOW: true, + podInfos: []podInfo{defaultPodInfo}, postSyncStatCallback: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error { skippedVolumes := rcInstance.skippedDuringReconstruction if len(skippedVolumes) != 1 { @@ -2303,6 +2359,22 @@ func TestSyncStates(t *testing.T) { if !addedViaReconstruction { return fmt.Errorf("expected volume %s to be marked as added via reconstruction", mountedPodVolume.VolumeName) } + + // check device mount state + attachedVolumes := rcInstance.actualStateOfWorld.GetAttachedVolumes() + if len(attachedVolumes) != 1 { + return fmt.Errorf("expected 1 volume to be unmounted, got %d", len(attachedVolumes)) + } + firstAttachedVolume := attachedVolumes[0] + if !firstAttachedVolume.DeviceMayBeMounted() { + return fmt.Errorf("expected %s volume to be mounted in uncertain state", firstAttachedVolume.VolumeName) + } + + // also skippedVolumes map should be empty + skippedVolumes := rcInstance.skippedDuringReconstruction + if len(skippedVolumes) > 0 { + return fmt.Errorf("expected 0 pods in skipped volumes found %d", len(skippedVolumes)) + } return nil }, }, @@ -2315,28 +2387,6 @@ func TestSyncStates(t *testing.T) { } defer os.RemoveAll(tmpKubeletDir) - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - UID: "pod1uid", - }, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: "volume-name", - VolumeSource: v1.VolumeSource{ - GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ - PDName: "volume-name", - }, - }, - }, - }, - }, - } - - volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} - podName := util.GetUniquePodName(pod) - // create kubelet pod directory tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods") os.MkdirAll(tmpKubeletPodDir, 0755) @@ -2355,7 +2405,10 @@ func TestSyncStates(t *testing.T) { rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths) rcInstance, _ := rc.(*reconciler) - if tc.addToDSOW { + for _, tpodInfo := range tc.podInfos { + pod := getInlineFakePod(tpodInfo.podName, tpodInfo.podUID, tpodInfo.outerVolumeName, tpodInfo.innerVolumeName) + volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} + podName := util.GetUniquePodName(pod) volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume( podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) if err != nil { diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 8484aa31c76..344ac32023c 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -1660,8 +1660,8 @@ func GetTestKubeletVolumePluginMgrWithNode(t *testing.T, node *v1.Node) (*volume return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin) } -func GetTestKubeletVolumePluginMgrWithNodeAndRoot(t *testing.T, node *v1.Node, rootDir string) (*VolumePluginMgr, *FakeVolumePlugin) { - plugins := ProbeVolumePlugins(VolumeConfig{}) +func GetTestKubeletVolumePluginMgrWithNodeAndRoot(t *testing.T, node *v1.Node, rootDir string) (*volume.VolumePluginMgr, *FakeVolumePlugin) { + plugins := ProbeVolumePlugins(volume.VolumeConfig{}) v := NewFakeKubeletVolumeHost( t, rootDir, /* rootDir */ diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 541d86e2ace..13079bf1581 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -216,9 +216,18 @@ type ActualStateOfWorldMounterUpdater interface { // volume expansion must not be retried for this volume MarkForInUseExpansionError(volumeName v1.UniqueVolumeName) - // AddVolumeViaReconstruction adds the volume to actual state of the world and also - // marks the volume as one found during reconstruction. - AddVolumeViaReconstruction(opts MarkVolumeOpts) error + // CheckAndMarkVolumeAsUncertainViaReconstruction only adds volume to actual state of the world + // if volume was not already there. This avoid overwriting in any previously stored + // state. It returns error if there was an error adding the volume to ASOW. + // It returns true, if this operation resulted in volume being added to ASOW + // otherwise it returns false. + CheckAndMarkVolumeAsUncertainViaReconstruction(opts MarkVolumeOpts) (bool, error) + + // CheckAndMarkDeviceUncertainViaReconstruction only adds device to actual state of the world + // if device was not already there. This avoids overwriting in any previously stored + // state. We only supply deviceMountPath because devicePath is already determined from + // VerifyControllerAttachedVolume function. + CheckAndMarkDeviceUncertainViaReconstruction(volumeName v1.UniqueVolumeName, deviceMountPath string) bool // IsVolumeReconstructed returns true if volume currently added to actual state of the world // was found during reconstruction. diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 71ca9ce40d5..fe04932c4dd 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -788,7 +788,7 @@ func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, // if volume was previously reconstructed we are not going to change its state as unmounted even // if mount operation fails. if actualStateOfWorld.IsVolumeReconstructed(volumeToMount.VolumeName, volumeToMount.PodName) { - klog.V(3).Infof("MountVolume.markVolumeErrorState leaving volume uncertain", "volumeName", volumeToMount.VolumeName) + klog.V(3).InfoS("MountVolume.markVolumeErrorState leaving volume uncertain", "volumeName", volumeToMount.VolumeName) return }