diff --git a/pkg/kubelet/kubelet_volumes.go b/pkg/kubelet/kubelet_volumes.go index 9396a5c62c4..0ec1e5e9f9c 100644 --- a/pkg/kubelet/kubelet_volumes.go +++ b/pkg/kubelet/kubelet_volumes.go @@ -71,10 +71,10 @@ func (kl *Kubelet) ListBlockVolumesForPod(podUID types.UID) (map[string]volume.B } // podVolumesExist checks with the volume manager and returns true any of the -// pods for the specified volume are mounted. +// pods for the specified volume are mounted or are uncertain. func (kl *Kubelet) podVolumesExist(podUID types.UID) bool { if mountedVolumes := - kl.volumeManager.GetMountedVolumesForPod( + kl.volumeManager.GetPossiblyMountedVolumesForPod( volumetypes.UniquePodName(podUID)); len(mountedVolumes) > 0 { return true } diff --git a/pkg/kubelet/kubelet_volumes_test.go b/pkg/kubelet/kubelet_volumes_test.go index 0f4117c3a58..6b7128b4f4a 100644 --- a/pkg/kubelet/kubelet_volumes_test.go +++ b/pkg/kubelet/kubelet_volumes_test.go @@ -397,6 +397,9 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) { podVolumes := kubelet.volumeManager.GetMountedVolumesForPod( util.GetUniquePodName(pod)) + allPodVolumes := kubelet.volumeManager.GetPossiblyMountedVolumesForPod( + util.GetUniquePodName(pod)) + assert.Equal(t, podVolumes, allPodVolumes, "GetMountedVolumesForPod and GetPossiblyMountedVolumesForPod should return the same volumes") expectedPodVolumes := []string{"vol1"} assert.Len(t, podVolumes, len(expectedPodVolumes), "Volumes for pod %+v", pod) @@ -476,6 +479,9 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) { podVolumes := kubelet.volumeManager.GetMountedVolumesForPod( util.GetUniquePodName(pod)) + allPodVolumes := kubelet.volumeManager.GetPossiblyMountedVolumesForPod( + util.GetUniquePodName(pod)) + assert.Equal(t, podVolumes, allPodVolumes, "GetMountedVolumesForPod and GetPossiblyMountedVolumesForPod should return the same volumes") expectedPodVolumes := []string{"vol1"} assert.Len(t, podVolumes, len(expectedPodVolumes), "Volumes for pod %+v", pod) @@ -500,6 +506,9 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) { // Verify volumes unmounted podVolumes = kubelet.volumeManager.GetMountedVolumesForPod( util.GetUniquePodName(pod)) + allPodVolumes = kubelet.volumeManager.GetPossiblyMountedVolumesForPod( + util.GetUniquePodName(pod)) + assert.Equal(t, podVolumes, allPodVolumes, "GetMountedVolumesForPod and GetPossiblyMountedVolumesForPod should return the same volumes") assert.Len(t, podVolumes, 0, "Expected volumes to be unmounted and detached. But some volumes are still mounted: %#v", podVolumes) diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 70a4c3d3409..db0ad30dc83 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -110,6 +110,14 @@ type ActualStateOfWorld interface { // volumes that do not need to update contents should not fail. PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) (bool, string, error) + // PodRemovedFromVolume returns true if the given pod does not exist in the list of + // mountedPods for the given volume in the cache, indicating that the pod has + // fully unmounted it or it was never mounted the volume. + // If the volume is fully mounted or is in uncertain mount state for the pod, it is + // considered that the pod still exists in volume manager's actual state of the world + // and false is returned. + PodRemovedFromVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) bool + // VolumeExistsWithSpecName returns true if the given volume specified with the // volume spec name (a.k.a., InnerVolumeSpecName) exists in the list of // volumes that should be attached to this node. @@ -136,6 +144,11 @@ type ActualStateOfWorld interface { // current actual state of the world. GetMountedVolumesForPod(podName volumetypes.UniquePodName) []MountedVolume + // GetPossiblyMountedVolumesForPod generates and returns a list of volumes for + // the specified pod that either are attached and mounted or are "uncertain", + // i.e. a volume plugin may be mounting the volume right now. + GetPossiblyMountedVolumesForPod(podName volumetypes.UniquePodName) []MountedVolume + // GetGloballyMountedVolumes generates and returns a list of all attached // volumes that are globally mounted. This list can be used to determine // which volumes should be reported as "in use" in the node's VolumesInUse @@ -681,6 +694,31 @@ func (asw *actualStateOfWorld) PodExistsInVolume( return podExists, volumeObj.devicePath, nil } +func (asw *actualStateOfWorld) PodRemovedFromVolume( + podName volumetypes.UniquePodName, + volumeName v1.UniqueVolumeName) bool { + asw.RLock() + defer asw.RUnlock() + + volumeObj, volumeExists := asw.attachedVolumes[volumeName] + if !volumeExists { + return true + } + + podObj, podExists := volumeObj.mountedPods[podName] + if podExists { + // if volume mount was uncertain we should keep trying to unmount the volume + if podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain { + return false + } + if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted { + return false + } + } + + return true +} + func (asw *actualStateOfWorld) VolumeExistsWithSpecName(podName volumetypes.UniquePodName, volumeSpecName string) bool { asw.RLock() defer asw.RUnlock() @@ -757,6 +795,26 @@ func (asw *actualStateOfWorld) GetMountedVolumesForPod( return mountedVolume } +func (asw *actualStateOfWorld) GetPossiblyMountedVolumesForPod( + podName volumetypes.UniquePodName) []MountedVolume { + asw.RLock() + defer asw.RUnlock() + mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) + for _, volumeObj := range asw.attachedVolumes { + for mountedPodName, podObj := range volumeObj.mountedPods { + if mountedPodName == podName && + (podObj.volumeMountStateForPod == operationexecutor.VolumeMounted || + podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain) { + mountedVolume = append( + mountedVolume, + getMountedVolume(&podObj, &volumeObj)) + } + } + } + + return mountedVolume +} + func (asw *actualStateOfWorld) GetGloballyMountedVolumes() []AttachedVolume { asw.RLock() defer asw.RUnlock() diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go index 224a1e954dd..fc15804bdc8 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go @@ -20,7 +20,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/volume" volumetesting "k8s.io/kubernetes/pkg/volume/testing" @@ -653,13 +653,28 @@ func TestUncertainVolumeMounts(t *testing.T) { } } if volumeFound { - t.Fatalf("expected volume %s to be not found in asw", volumeSpec1.Name()) + t.Fatalf("expected volume %s to be not found in asw.GetMountedVolumesForPod", volumeSpec1.Name()) + } + + possiblyMountedVolumes := asw.GetPossiblyMountedVolumesForPod(podName1) + volumeFound = false + for _, volume := range possiblyMountedVolumes { + if volume.InnerVolumeSpecName == volumeSpec1.Name() { + volumeFound = true + } + } + if !volumeFound { + t.Fatalf("expected volume %s to be found in aws.GetPossiblyMountedVolumesForPod", volumeSpec1.Name()) } volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1) if volExists { t.Fatalf("expected volume %s to not exist in asw", generatedVolumeName1) } + removed := asw.PodRemovedFromVolume(podName1, generatedVolumeName1) + if removed { + t.Fatalf("expected volume %s not to be removed in asw", generatedVolumeName1) + } } func verifyVolumeExistsInGloballyMountedVolumes( diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index 8300d9e08e9..f9c4f46f312 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -278,12 +278,12 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { klog.V(4).InfoS("Pod still has one or more containers in the non-exited state and will not be removed from desired state", "pod", klog.KObj(volumeToMount.Pod)) continue } - exists, _, _ := dswp.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) var volumeToMountSpecName string if volumeToMount.VolumeSpec != nil { volumeToMountSpecName = volumeToMount.VolumeSpec.Name() } - if !exists && podExists { + removed := dswp.actualStateOfWorld.PodRemovedFromVolume(volumeToMount.PodName, volumeToMount.VolumeName) + if removed && podExists { klog.V(4).InfoS("Actual state does not yet have volume mount information and pod still exists in pod manager, skip removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName) continue } diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index 87a13368d78..006f2fa8c60 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -156,6 +156,184 @@ func TestFindAndAddNewPods_WithVolumeRetrievalError(t *testing.T) { } func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) { + dswp, fakeRuntime, pod, expectedVolumeName, _ := prepareDSWPWithPodPV(t) + podName := util.GetUniquePodName(pod) + + //let the pod be terminated + podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name) + if !exist { + t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) + } + podGet.Status.Phase = v1.PodFailed + dswp.podManager.DeletePod(pod) + + fakeRuntime.PodList = []*containertest.FakePod{ + { + Pod: &kubecontainer.Pod{ + Name: pod.Name, + ID: pod.UID, + Sandboxes: []*kubecontainer.Container{ + { + Name: "dswp-test-pod-sandbox", + }, + }, + }, + }, + } + + dswp.findAndRemoveDeletedPods() + + if !dswp.pods.processedPods[podName] { + t.Fatalf("Pod should not been removed from desired state of world since sandbox exist") + } + + fakeRuntime.PodList = nil + + // fakeRuntime can not get the pod,so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted + dswp.findAndRemoveDeletedPods() + if dswp.pods.processedPods[podName] { + t.Fatalf("Failed to remove pods from desired state of world since they no longer exist") + } + + volumeExists := dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName) + if volumeExists { + t.Fatalf( + "VolumeExists(%q) failed. Expected: Actual: <%v>", + expectedVolumeName, + volumeExists) + } + + if podExistsInVolume := dswp.desiredStateOfWorld.PodExistsInVolume( + podName, expectedVolumeName); podExistsInVolume { + t.Fatalf( + "DSW PodExistsInVolume returned incorrect value. Expected: Actual: <%v>", + podExistsInVolume) + } + + volumesToMount := dswp.desiredStateOfWorld.GetVolumesToMount() + for _, volume := range volumesToMount { + if volume.VolumeName == expectedVolumeName { + t.Fatalf( + "Found volume %v in the list of desired state of world volumes to mount. Expected not", + expectedVolumeName) + } + } +} + +func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) { + dswp, _, pod, expectedVolumeName, _ := prepareDSWPWithPodPV(t) + podName := util.GetUniquePodName(pod) + + //let the pod be terminated + podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name) + if !exist { + t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) + } + podGet.Status.Phase = v1.PodFailed + + dswp.findAndRemoveDeletedPods() + // Although Pod status is terminated, pod still exists in pod manager and actual state does not has this pod and volume information + // desired state populator will fail to delete this pod and volume first + volumeExists := dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName) + if !volumeExists { + t.Fatalf( + "VolumeExists(%q) failed. Expected: Actual: <%v>", + expectedVolumeName, + volumeExists) + } + + if podExistsInVolume := dswp.desiredStateOfWorld.PodExistsInVolume( + podName, expectedVolumeName); !podExistsInVolume { + t.Fatalf( + "DSW PodExistsInVolume returned incorrect value. Expected: Actual: <%v>", + podExistsInVolume) + } + + // reconcile with actual state so that volume is added into the actual state + // desired state populator now can successfully delete the pod and volume + fakeASW := dswp.actualStateOfWorld + reconcileASW(fakeASW, dswp.desiredStateOfWorld, t) + dswp.findAndRemoveDeletedPods() + volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName) + if volumeExists { + t.Fatalf( + "VolumeExists(%q) failed. Expected: Actual: <%v>", + expectedVolumeName, + volumeExists) + } + + if podExistsInVolume := dswp.desiredStateOfWorld.PodExistsInVolume( + podName, expectedVolumeName); podExistsInVolume { + t.Fatalf( + "DSW PodExistsInVolume returned incorrect value. Expected: Actual: <%v>", + podExistsInVolume) + } +} + +func TestFindAndRemoveDeletedPodsWithUncertain(t *testing.T) { + dswp, fakeRuntime, pod, expectedVolumeName, pv := prepareDSWPWithPodPV(t) + podName := util.GetUniquePodName(pod) + + //let the pod be terminated + podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name) + if !exist { + t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) + } + podGet.Status.Phase = v1.PodFailed + dswp.podManager.DeletePod(pod) + fakeRuntime.PodList = nil + + // Add the volume to ASW by reconciling. + fakeASW := dswp.actualStateOfWorld + reconcileASW(fakeASW, dswp.desiredStateOfWorld, t) + + // Mark the volume as uncertain + opts := operationexecutor.MarkVolumeOpts{ + PodName: util.GetUniquePodName(pod), + PodUID: pod.UID, + VolumeName: expectedVolumeName, + OuterVolumeSpecName: "dswp-test-volume-name", + VolumeGidVolume: "", + VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false), + VolumeMountState: operationexecutor.VolumeMountUncertain, + } + err := dswp.actualStateOfWorld.MarkVolumeMountAsUncertain(opts) + if err != nil { + t.Fatalf("Failed to set the volume as uncertain: %s", err) + } + + // fakeRuntime can not get the pod,so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted + dswp.findAndRemoveDeletedPods() + if dswp.pods.processedPods[podName] { + t.Fatalf("Failed to remove pods from desired state of world since they no longer exist") + } + + volumeExists := dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName) + if volumeExists { + t.Fatalf( + "VolumeExists(%q) failed. Expected: Actual: <%v>", + expectedVolumeName, + volumeExists) + } + + if podExistsInVolume := dswp.desiredStateOfWorld.PodExistsInVolume( + podName, expectedVolumeName); podExistsInVolume { + t.Fatalf( + "DSW PodExistsInVolume returned incorrect value. Expected: Actual: <%v>", + podExistsInVolume) + } + + volumesToMount := dswp.desiredStateOfWorld.GetVolumesToMount() + for _, volume := range volumesToMount { + if volume.VolumeName == expectedVolumeName { + t.Fatalf( + "Found volume %v in the list of desired state of world volumes to mount. Expected not", + expectedVolumeName) + } + } +} + +func prepareDSWPWithPodPV(t *testing.T) (*desiredStateOfWorldPopulator, *containertest.FakeRuntime, *v1.Pod, v1.UniqueVolumeName, *v1.PersistentVolume) { // create dswp mode := v1.PersistentVolumeFilesystem pv := &v1.PersistentVolume{ @@ -221,181 +399,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) { verifyVolumeExistsInVolumesToMount( t, v1.UniqueVolumeName(generatedVolumeName), false /* expectReportedInUse */, fakesDSW) - - //let the pod be terminated - podGet, exist := fakePodManager.GetPodByName(pod.Namespace, pod.Name) - if !exist { - t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) - } - podGet.Status.Phase = v1.PodFailed - - fakePodManager.DeletePod(pod) - - fakeRuntime.PodList = []*containertest.FakePod{ - { - Pod: &kubecontainer.Pod{ - Name: pod.Name, - ID: pod.UID, - Sandboxes: []*kubecontainer.Container{ - { - Name: "dswp-test-pod-sandbox", - }, - }, - }, - }, - } - - dswp.findAndRemoveDeletedPods() - - if !dswp.pods.processedPods[podName] { - t.Fatalf("Pod should not been removed from desired state of world since sandbox exist") - } - - fakeRuntime.PodList = nil - - // fakeRuntime can not get the pod,so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted - dswp.findAndRemoveDeletedPods() - if dswp.pods.processedPods[podName] { - t.Fatalf("Failed to remove pods from desired state of world since they no longer exist") - } - - volumeExists = fakesDSW.VolumeExists(expectedVolumeName) - if volumeExists { - t.Fatalf( - "VolumeExists(%q) failed. Expected: Actual: <%v>", - expectedVolumeName, - volumeExists) - } - - if podExistsInVolume := fakesDSW.PodExistsInVolume( - podName, expectedVolumeName); podExistsInVolume { - t.Fatalf( - "DSW PodExistsInVolume returned incorrect value. Expected: Actual: <%v>", - podExistsInVolume) - } - - volumesToMount := fakesDSW.GetVolumesToMount() - for _, volume := range volumesToMount { - if volume.VolumeName == expectedVolumeName { - t.Fatalf( - "Found volume %v in the list of desired state of world volumes to mount. Expected not", - expectedVolumeName) - } - } - -} - -func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) { - // create dswp - mode := v1.PersistentVolumeFilesystem - pv := &v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: "dswp-test-volume-name", - }, - Spec: v1.PersistentVolumeSpec{ - ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "file-bound"}, - VolumeMode: &mode, - }, - } - pvc := &v1.PersistentVolumeClaim{ - Spec: v1.PersistentVolumeClaimSpec{ - VolumeName: "dswp-test-volume-name", - }, - Status: v1.PersistentVolumeClaimStatus{ - Phase: v1.ClaimBound, - }, - } - dswp, fakePodManager, fakesDSW, _ := createDswpWithVolume(t, pv, pvc) - - // create pod - containers := []v1.Container{ - { - VolumeMounts: []v1.VolumeMount{ - { - Name: "dswp-test-volume-name", - MountPath: "/mnt", - }, - }, - }, - } - pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers) - - fakePodManager.AddPod(pod) - - podName := util.GetUniquePodName(pod) - - generatedVolumeName := "fake-plugin/" + pod.Spec.Volumes[0].Name - - dswp.findAndAddNewPods() - - if !dswp.pods.processedPods[podName] { - t.Fatalf("Failed to record that the volumes for the specified pod: %s have been processed by the populator", podName) - } - - expectedVolumeName := v1.UniqueVolumeName(generatedVolumeName) - - volumeExists := fakesDSW.VolumeExists(expectedVolumeName) - if !volumeExists { - t.Fatalf( - "VolumeExists(%q) failed. Expected: Actual: <%v>", - expectedVolumeName, - volumeExists) - } - - if podExistsInVolume := fakesDSW.PodExistsInVolume( - podName, expectedVolumeName); !podExistsInVolume { - t.Fatalf( - "DSW PodExistsInVolume returned incorrect value. Expected: Actual: <%v>", - podExistsInVolume) - } - - verifyVolumeExistsInVolumesToMount( - t, v1.UniqueVolumeName(generatedVolumeName), false /* expectReportedInUse */, fakesDSW) - - //let the pod be terminated - podGet, exist := fakePodManager.GetPodByName(pod.Namespace, pod.Name) - if !exist { - t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) - } - podGet.Status.Phase = v1.PodFailed - - dswp.findAndRemoveDeletedPods() - // Although Pod status is terminated, pod still exists in pod manager and actual state does not has this pod and volume information - // desired state populator will fail to delete this pod and volume first - volumeExists = fakesDSW.VolumeExists(expectedVolumeName) - if !volumeExists { - t.Fatalf( - "VolumeExists(%q) failed. Expected: Actual: <%v>", - expectedVolumeName, - volumeExists) - } - - if podExistsInVolume := fakesDSW.PodExistsInVolume( - podName, expectedVolumeName); !podExistsInVolume { - t.Fatalf( - "DSW PodExistsInVolume returned incorrect value. Expected: Actual: <%v>", - podExistsInVolume) - } - - // reconcile with actual state so that volume is added into the actual state - // desired state populator now can successfully delete the pod and volume - fakeASW := dswp.actualStateOfWorld - reconcileASW(fakeASW, fakesDSW, t) - dswp.findAndRemoveDeletedPods() - volumeExists = fakesDSW.VolumeExists(expectedVolumeName) - if volumeExists { - t.Fatalf( - "VolumeExists(%q) failed. Expected: Actual: <%v>", - expectedVolumeName, - volumeExists) - } - - if podExistsInVolume := fakesDSW.PodExistsInVolume( - podName, expectedVolumeName); podExistsInVolume { - t.Fatalf( - "DSW PodExistsInVolume returned incorrect value. Expected: Actual: <%v>", - podExistsInVolume) - } + return dswp, fakeRuntime, pod, expectedVolumeName, pv } func TestFindAndRemoveNonattachableVolumes(t *testing.T) { diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index 72e5f494b3d..1245f46679e 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -113,6 +113,14 @@ type VolumeManager interface { // volumes. GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap + // GetPossiblyMountedVolumesForPod returns a VolumeMap containing the volumes + // referenced by the specified pod that are either successfully attached + // and mounted or are "uncertain", i.e. a volume plugin may be mounting + // them right now. The key in the map is the OuterVolumeSpecName (i.e. + // pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no + // volumes. + GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap + // GetExtraSupplementalGroupsForPod returns a list of the extra // supplemental groups for the Pod. These extra supplemental groups come // from annotations on persistent volumes that the pod depends on. @@ -290,6 +298,19 @@ func (vm *volumeManager) GetMountedVolumesForPod(podName types.UniquePodName) co return podVolumes } +func (vm *volumeManager) GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap { + podVolumes := make(container.VolumeMap) + for _, mountedVolume := range vm.actualStateOfWorld.GetPossiblyMountedVolumesForPod(podName) { + podVolumes[mountedVolume.OuterVolumeSpecName] = container.VolumeInfo{ + Mounter: mountedVolume.Mounter, + BlockVolumeMapper: mountedVolume.BlockVolumeMapper, + ReadOnly: mountedVolume.VolumeSpec.ReadOnly, + InnerVolumeSpecName: mountedVolume.InnerVolumeSpecName, + } + } + return podVolumes +} + func (vm *volumeManager) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 { podName := util.GetUniquePodName(pod) supplementalGroups := sets.NewString() diff --git a/pkg/kubelet/volumemanager/volume_manager_fake.go b/pkg/kubelet/volumemanager/volume_manager_fake.go index b21d34a14ce..ad4e87e7ad8 100644 --- a/pkg/kubelet/volumemanager/volume_manager_fake.go +++ b/pkg/kubelet/volumemanager/volume_manager_fake.go @@ -17,7 +17,7 @@ limitations under the License. package volumemanager import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/volume/util/types" @@ -55,6 +55,11 @@ func (f *FakeVolumeManager) GetMountedVolumesForPod(podName types.UniquePodName) return nil } +// GetPossiblyMountedVolumesForPod is not implemented +func (f *FakeVolumeManager) GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap { + return nil +} + // GetExtraSupplementalGroupsForPod is not implemented func (f *FakeVolumeManager) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 { return nil diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 48ba1076e0d..13bcfd2a567 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -821,6 +821,22 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc( // Execute unmount unmountErr := volumeUnmounter.TearDown() if unmountErr != nil { + // Mark the volume as uncertain, so SetUp is called for new pods. Teardown may be already in progress. + opts := MarkVolumeOpts{ + PodName: volumeToUnmount.PodName, + PodUID: volumeToUnmount.PodUID, + VolumeName: volumeToUnmount.VolumeName, + OuterVolumeSpecName: volumeToUnmount.OuterVolumeSpecName, + VolumeGidVolume: volumeToUnmount.VolumeGidValue, + VolumeSpec: volumeToUnmount.VolumeSpec, + VolumeMountState: VolumeMountUncertain, + } + markMountUncertainErr := actualStateOfWorld.MarkVolumeMountAsUncertain(opts) + if markMountUncertainErr != nil { + // There is nothing else we can do. Hope that UnmountVolume will be re-tried shortly. + klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeMountAsUncertain failed", markMountUncertainErr).Error()) + } + // On failure, return error. Caller will log and retry. eventErr, detailedErr := volumeToUnmount.GenerateError("UnmountVolume.TearDown failed", unmountErr) return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) @@ -907,6 +923,13 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc( // Execute unmount unmountDeviceErr := volumeDeviceUnmounter.UnmountDevice(deviceMountPath) if unmountDeviceErr != nil { + // Mark the device as uncertain, so MountDevice is called for new pods. UnmountDevice may be already in progress. + markDeviceUncertainErr := actualStateOfWorld.MarkDeviceAsUncertain(deviceToDetach.VolumeName, deviceToDetach.DevicePath, deviceMountPath) + if markDeviceUncertainErr != nil { + // There is nothing else we can do. Hope that UnmountDevice will be re-tried shortly. + klog.Errorf(deviceToDetach.GenerateErrorDetailed("UnmountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainErr).Error()) + } + // On failure, return error. Caller will log and retry. eventErr, detailedErr := deviceToDetach.GenerateError("UnmountDevice failed", unmountDeviceErr) return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) @@ -1208,6 +1231,25 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc( // plugins/kubernetes.io/{PluginName}/volumeDevices/{volumePluginDependentPath}/{podUID} globalUnmapPath := volumeToUnmount.DeviceMountPath + // Mark the device as uncertain to make sure kubelet calls UnmapDevice again in all the "return err" + // cases below. The volume is marked as fully un-mapped at the end of this function, when everything + // succeeds. + markVolumeOpts := MarkVolumeOpts{ + PodName: volumeToUnmount.PodName, + PodUID: volumeToUnmount.PodUID, + VolumeName: volumeToUnmount.VolumeName, + OuterVolumeSpecName: volumeToUnmount.OuterVolumeSpecName, + VolumeGidVolume: volumeToUnmount.VolumeGidValue, + VolumeSpec: volumeToUnmount.VolumeSpec, + VolumeMountState: VolumeMountUncertain, + } + markVolumeUncertainErr := actualStateOfWorld.MarkVolumeMountAsUncertain(markVolumeOpts) + if markVolumeUncertainErr != nil { + // On failure, return error. Caller will log and retry. + eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.MarkDeviceAsUncertain failed", markVolumeUncertainErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) + } + // Execute common unmap unmapErr := util.UnmapBlockVolume(og.blkUtil, globalUnmapPath, podDeviceUnmapPath, volName, volumeToUnmount.PodUID) if unmapErr != nil { @@ -1309,6 +1351,17 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc( return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) } + // Mark the device as uncertain to make sure kubelet calls UnmapDevice again in all the "return err" + // cases below. The volume is marked as fully un-mapped at the end of this function, when everything + // succeeds. + markDeviceUncertainErr := actualStateOfWorld.MarkDeviceAsUncertain( + deviceToDetach.VolumeName, deviceToDetach.DevicePath, globalMapPath) + if markDeviceUncertainErr != nil { + // On failure, return error. Caller will log and retry. + eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.MarkDeviceAsUncertain failed", markDeviceUncertainErr) + return volumetypes.NewOperationContext(eventErr, detailedErr, migrated) + } + // Call TearDownDevice if blockVolumeUnmapper implements CustomBlockVolumeUnmapper if customBlockVolumeUnmapper, ok := blockVolumeUnmapper.(volume.CustomBlockVolumeUnmapper); ok { // Execute tear down device diff --git a/test/e2e/storage/csi_mock_volume.go b/test/e2e/storage/csi_mock_volume.go index 161a238b4e5..454c1e1699e 100644 --- a/test/e2e/storage/csi_mock_volume.go +++ b/test/e2e/storage/csi_mock_volume.go @@ -898,6 +898,128 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { } }) + ginkgo.Context("CSI NodeUnstage error cases [Slow]", func() { + trackedCalls := []string{ + "NodeStageVolume", + "NodeUnstageVolume", + } + + // Each test starts two pods in sequence. + // The first pod always runs successfully, but NodeUnstage hook can set various error conditions. + // The test then checks how NodeStage of the second pod is called. + tests := []struct { + name string + expectedCalls []csiCall + + // Called for each NodeStageVolume calls, with counter incremented atomically before + // the invocation (i.e. first value will be 1) and index of deleted pod (the first pod + // has index 1) + nodeUnstageHook func(counter, pod int64) error + }{ + { + // This is already tested elsewhere, adding simple good case here to test the test framework. + name: "should call NodeStage after NodeUnstage success", + expectedCalls: []csiCall{ + {expectedMethod: "NodeStageVolume", expectedError: codes.OK}, + {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, + {expectedMethod: "NodeStageVolume", expectedError: codes.OK}, + {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, + }, + }, + { + name: "two pods: should call NodeStage after previous NodeUnstage final error", + expectedCalls: []csiCall{ + {expectedMethod: "NodeStageVolume", expectedError: codes.OK}, + {expectedMethod: "NodeUnstageVolume", expectedError: codes.InvalidArgument}, + {expectedMethod: "NodeStageVolume", expectedError: codes.OK}, + {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, + }, + nodeUnstageHook: func(counter, pod int64) error { + if pod == 1 { + return status.Error(codes.InvalidArgument, "fake final error") + } + return nil + }, + }, + { + name: "two pods: should call NodeStage after previous NodeUnstage transient error", + expectedCalls: []csiCall{ + {expectedMethod: "NodeStageVolume", expectedError: codes.OK}, + {expectedMethod: "NodeUnstageVolume", expectedError: codes.DeadlineExceeded}, + {expectedMethod: "NodeStageVolume", expectedError: codes.OK}, + {expectedMethod: "NodeUnstageVolume", expectedError: codes.OK}, + }, + nodeUnstageHook: func(counter, pod int64) error { + if pod == 1 { + return status.Error(codes.DeadlineExceeded, "fake transient error") + } + return nil + }, + }, + } + for _, t := range tests { + test := t + ginkgo.It(test.name, func() { + // Index of the last deleted pod. NodeUnstage calls are then related to this pod. + var deletedPodNumber int64 = 1 + var hooks *drivers.Hooks + if test.nodeUnstageHook != nil { + hooks = createPreHook("NodeUnstageVolume", func(counter int64) error { + pod := atomic.LoadInt64(&deletedPodNumber) + return test.nodeUnstageHook(counter, pod) + }) + } + init(testParameters{ + disableAttach: true, + registerDriver: true, + hooks: hooks, + }) + defer cleanup() + + _, claim, pod := createPod(false) + if pod == nil { + return + } + // Wait for PVC to get bound to make sure the CSI driver is fully started. + err := e2epv.WaitForPersistentVolumeClaimPhase(v1.ClaimBound, f.ClientSet, f.Namespace.Name, claim.Name, time.Second, framework.ClaimProvisionTimeout) + framework.ExpectNoError(err, "while waiting for PVC to get provisioned") + err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) + framework.ExpectNoError(err, "while waiting for the first pod to start") + err = e2epod.DeletePodWithWait(m.cs, pod) + framework.ExpectNoError(err, "while deleting the first pod") + + // Create the second pod + pod, err = createPodWithPVC(claim) + framework.ExpectNoError(err, "while creating the second pod") + err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace) + framework.ExpectNoError(err, "while waiting for the second pod to start") + // The second pod is running and kubelet can't call NodeUnstage of the first one. + // Therefore incrementing the pod counter is safe here. + atomic.AddInt64(&deletedPodNumber, 1) + err = e2epod.DeletePodWithWait(m.cs, pod) + framework.ExpectNoError(err, "while deleting the second pod") + + ginkgo.By("Waiting for all remaining expected CSI calls") + err = wait.Poll(time.Second, csiUnstageWaitTimeout, func() (done bool, err error) { + _, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.driver.GetCalls) + if err != nil { + return true, err + } + if index == 0 { + // No CSI call received yet + return false, nil + } + if len(test.expectedCalls) == index { + // all calls received + return true, nil + } + return false, nil + }) + framework.ExpectNoError(err, "while waiting for all CSI calls") + }) + } + }) + ginkgo.Context("storage capacity", func() { tests := []struct { name string