mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Merge pull request #100183 from jsafrane/fix-unstage-retry
Mark volume as uncertain after Unmount* fails
This commit is contained in:
commit
4afb72a863
@ -71,10 +71,10 @@ func (kl *Kubelet) ListBlockVolumesForPod(podUID types.UID) (map[string]volume.B
|
||||
}
|
||||
|
||||
// podVolumesExist checks with the volume manager and returns true any of the
|
||||
// pods for the specified volume are mounted.
|
||||
// pods for the specified volume are mounted or are uncertain.
|
||||
func (kl *Kubelet) podVolumesExist(podUID types.UID) bool {
|
||||
if mountedVolumes :=
|
||||
kl.volumeManager.GetMountedVolumesForPod(
|
||||
kl.volumeManager.GetPossiblyMountedVolumesForPod(
|
||||
volumetypes.UniquePodName(podUID)); len(mountedVolumes) > 0 {
|
||||
return true
|
||||
}
|
||||
|
@ -397,6 +397,9 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
|
||||
|
||||
podVolumes := kubelet.volumeManager.GetMountedVolumesForPod(
|
||||
util.GetUniquePodName(pod))
|
||||
allPodVolumes := kubelet.volumeManager.GetPossiblyMountedVolumesForPod(
|
||||
util.GetUniquePodName(pod))
|
||||
assert.Equal(t, podVolumes, allPodVolumes, "GetMountedVolumesForPod and GetPossiblyMountedVolumesForPod should return the same volumes")
|
||||
|
||||
expectedPodVolumes := []string{"vol1"}
|
||||
assert.Len(t, podVolumes, len(expectedPodVolumes), "Volumes for pod %+v", pod)
|
||||
@ -476,6 +479,9 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
|
||||
|
||||
podVolumes := kubelet.volumeManager.GetMountedVolumesForPod(
|
||||
util.GetUniquePodName(pod))
|
||||
allPodVolumes := kubelet.volumeManager.GetPossiblyMountedVolumesForPod(
|
||||
util.GetUniquePodName(pod))
|
||||
assert.Equal(t, podVolumes, allPodVolumes, "GetMountedVolumesForPod and GetPossiblyMountedVolumesForPod should return the same volumes")
|
||||
|
||||
expectedPodVolumes := []string{"vol1"}
|
||||
assert.Len(t, podVolumes, len(expectedPodVolumes), "Volumes for pod %+v", pod)
|
||||
@ -500,6 +506,9 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
|
||||
// Verify volumes unmounted
|
||||
podVolumes = kubelet.volumeManager.GetMountedVolumesForPod(
|
||||
util.GetUniquePodName(pod))
|
||||
allPodVolumes = kubelet.volumeManager.GetPossiblyMountedVolumesForPod(
|
||||
util.GetUniquePodName(pod))
|
||||
assert.Equal(t, podVolumes, allPodVolumes, "GetMountedVolumesForPod and GetPossiblyMountedVolumesForPod should return the same volumes")
|
||||
|
||||
assert.Len(t, podVolumes, 0,
|
||||
"Expected volumes to be unmounted and detached. But some volumes are still mounted: %#v", podVolumes)
|
||||
|
@ -110,6 +110,14 @@ type ActualStateOfWorld interface {
|
||||
// volumes that do not need to update contents should not fail.
|
||||
PodExistsInVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) (bool, string, error)
|
||||
|
||||
// PodRemovedFromVolume returns true if the given pod does not exist in the list of
|
||||
// mountedPods for the given volume in the cache, indicating that the pod has
|
||||
// fully unmounted it or it was never mounted the volume.
|
||||
// If the volume is fully mounted or is in uncertain mount state for the pod, it is
|
||||
// considered that the pod still exists in volume manager's actual state of the world
|
||||
// and false is returned.
|
||||
PodRemovedFromVolume(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) bool
|
||||
|
||||
// VolumeExistsWithSpecName returns true if the given volume specified with the
|
||||
// volume spec name (a.k.a., InnerVolumeSpecName) exists in the list of
|
||||
// volumes that should be attached to this node.
|
||||
@ -136,6 +144,11 @@ type ActualStateOfWorld interface {
|
||||
// current actual state of the world.
|
||||
GetMountedVolumesForPod(podName volumetypes.UniquePodName) []MountedVolume
|
||||
|
||||
// GetPossiblyMountedVolumesForPod generates and returns a list of volumes for
|
||||
// the specified pod that either are attached and mounted or are "uncertain",
|
||||
// i.e. a volume plugin may be mounting the volume right now.
|
||||
GetPossiblyMountedVolumesForPod(podName volumetypes.UniquePodName) []MountedVolume
|
||||
|
||||
// GetGloballyMountedVolumes generates and returns a list of all attached
|
||||
// volumes that are globally mounted. This list can be used to determine
|
||||
// which volumes should be reported as "in use" in the node's VolumesInUse
|
||||
@ -681,6 +694,31 @@ func (asw *actualStateOfWorld) PodExistsInVolume(
|
||||
return podExists, volumeObj.devicePath, nil
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) PodRemovedFromVolume(
|
||||
podName volumetypes.UniquePodName,
|
||||
volumeName v1.UniqueVolumeName) bool {
|
||||
asw.RLock()
|
||||
defer asw.RUnlock()
|
||||
|
||||
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
|
||||
if !volumeExists {
|
||||
return true
|
||||
}
|
||||
|
||||
podObj, podExists := volumeObj.mountedPods[podName]
|
||||
if podExists {
|
||||
// if volume mount was uncertain we should keep trying to unmount the volume
|
||||
if podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain {
|
||||
return false
|
||||
}
|
||||
if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) VolumeExistsWithSpecName(podName volumetypes.UniquePodName, volumeSpecName string) bool {
|
||||
asw.RLock()
|
||||
defer asw.RUnlock()
|
||||
@ -757,6 +795,26 @@ func (asw *actualStateOfWorld) GetMountedVolumesForPod(
|
||||
return mountedVolume
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) GetPossiblyMountedVolumesForPod(
|
||||
podName volumetypes.UniquePodName) []MountedVolume {
|
||||
asw.RLock()
|
||||
defer asw.RUnlock()
|
||||
mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
|
||||
for _, volumeObj := range asw.attachedVolumes {
|
||||
for mountedPodName, podObj := range volumeObj.mountedPods {
|
||||
if mountedPodName == podName &&
|
||||
(podObj.volumeMountStateForPod == operationexecutor.VolumeMounted ||
|
||||
podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain) {
|
||||
mountedVolume = append(
|
||||
mountedVolume,
|
||||
getMountedVolume(&podObj, &volumeObj))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return mountedVolume
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) GetGloballyMountedVolumes() []AttachedVolume {
|
||||
asw.RLock()
|
||||
defer asw.RUnlock()
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
||||
@ -653,13 +653,28 @@ func TestUncertainVolumeMounts(t *testing.T) {
|
||||
}
|
||||
}
|
||||
if volumeFound {
|
||||
t.Fatalf("expected volume %s to be not found in asw", volumeSpec1.Name())
|
||||
t.Fatalf("expected volume %s to be not found in asw.GetMountedVolumesForPod", volumeSpec1.Name())
|
||||
}
|
||||
|
||||
possiblyMountedVolumes := asw.GetPossiblyMountedVolumesForPod(podName1)
|
||||
volumeFound = false
|
||||
for _, volume := range possiblyMountedVolumes {
|
||||
if volume.InnerVolumeSpecName == volumeSpec1.Name() {
|
||||
volumeFound = true
|
||||
}
|
||||
}
|
||||
if !volumeFound {
|
||||
t.Fatalf("expected volume %s to be found in aws.GetPossiblyMountedVolumesForPod", volumeSpec1.Name())
|
||||
}
|
||||
|
||||
volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1)
|
||||
if volExists {
|
||||
t.Fatalf("expected volume %s to not exist in asw", generatedVolumeName1)
|
||||
}
|
||||
removed := asw.PodRemovedFromVolume(podName1, generatedVolumeName1)
|
||||
if removed {
|
||||
t.Fatalf("expected volume %s not to be removed in asw", generatedVolumeName1)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyVolumeExistsInGloballyMountedVolumes(
|
||||
|
@ -278,12 +278,12 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
|
||||
klog.V(4).InfoS("Pod still has one or more containers in the non-exited state and will not be removed from desired state", "pod", klog.KObj(volumeToMount.Pod))
|
||||
continue
|
||||
}
|
||||
exists, _, _ := dswp.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
|
||||
var volumeToMountSpecName string
|
||||
if volumeToMount.VolumeSpec != nil {
|
||||
volumeToMountSpecName = volumeToMount.VolumeSpec.Name()
|
||||
}
|
||||
if !exists && podExists {
|
||||
removed := dswp.actualStateOfWorld.PodRemovedFromVolume(volumeToMount.PodName, volumeToMount.VolumeName)
|
||||
if removed && podExists {
|
||||
klog.V(4).InfoS("Actual state does not yet have volume mount information and pod still exists in pod manager, skip removing volume from desired state", "pod", klog.KObj(volumeToMount.Pod), "podUID", volumeToMount.Pod.UID, "volumeName", volumeToMountSpecName)
|
||||
continue
|
||||
}
|
||||
|
@ -156,6 +156,184 @@ func TestFindAndAddNewPods_WithVolumeRetrievalError(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) {
|
||||
dswp, fakeRuntime, pod, expectedVolumeName, _ := prepareDSWPWithPodPV(t)
|
||||
podName := util.GetUniquePodName(pod)
|
||||
|
||||
//let the pod be terminated
|
||||
podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name)
|
||||
if !exist {
|
||||
t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace)
|
||||
}
|
||||
podGet.Status.Phase = v1.PodFailed
|
||||
dswp.podManager.DeletePod(pod)
|
||||
|
||||
fakeRuntime.PodList = []*containertest.FakePod{
|
||||
{
|
||||
Pod: &kubecontainer.Pod{
|
||||
Name: pod.Name,
|
||||
ID: pod.UID,
|
||||
Sandboxes: []*kubecontainer.Container{
|
||||
{
|
||||
Name: "dswp-test-pod-sandbox",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
dswp.findAndRemoveDeletedPods()
|
||||
|
||||
if !dswp.pods.processedPods[podName] {
|
||||
t.Fatalf("Pod should not been removed from desired state of world since sandbox exist")
|
||||
}
|
||||
|
||||
fakeRuntime.PodList = nil
|
||||
|
||||
// fakeRuntime can not get the pod,so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted
|
||||
dswp.findAndRemoveDeletedPods()
|
||||
if dswp.pods.processedPods[podName] {
|
||||
t.Fatalf("Failed to remove pods from desired state of world since they no longer exist")
|
||||
}
|
||||
|
||||
volumeExists := dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName)
|
||||
if volumeExists {
|
||||
t.Fatalf(
|
||||
"VolumeExists(%q) failed. Expected: <false> Actual: <%v>",
|
||||
expectedVolumeName,
|
||||
volumeExists)
|
||||
}
|
||||
|
||||
if podExistsInVolume := dswp.desiredStateOfWorld.PodExistsInVolume(
|
||||
podName, expectedVolumeName); podExistsInVolume {
|
||||
t.Fatalf(
|
||||
"DSW PodExistsInVolume returned incorrect value. Expected: <false> Actual: <%v>",
|
||||
podExistsInVolume)
|
||||
}
|
||||
|
||||
volumesToMount := dswp.desiredStateOfWorld.GetVolumesToMount()
|
||||
for _, volume := range volumesToMount {
|
||||
if volume.VolumeName == expectedVolumeName {
|
||||
t.Fatalf(
|
||||
"Found volume %v in the list of desired state of world volumes to mount. Expected not",
|
||||
expectedVolumeName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) {
|
||||
dswp, _, pod, expectedVolumeName, _ := prepareDSWPWithPodPV(t)
|
||||
podName := util.GetUniquePodName(pod)
|
||||
|
||||
//let the pod be terminated
|
||||
podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name)
|
||||
if !exist {
|
||||
t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace)
|
||||
}
|
||||
podGet.Status.Phase = v1.PodFailed
|
||||
|
||||
dswp.findAndRemoveDeletedPods()
|
||||
// Although Pod status is terminated, pod still exists in pod manager and actual state does not has this pod and volume information
|
||||
// desired state populator will fail to delete this pod and volume first
|
||||
volumeExists := dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName)
|
||||
if !volumeExists {
|
||||
t.Fatalf(
|
||||
"VolumeExists(%q) failed. Expected: <true> Actual: <%v>",
|
||||
expectedVolumeName,
|
||||
volumeExists)
|
||||
}
|
||||
|
||||
if podExistsInVolume := dswp.desiredStateOfWorld.PodExistsInVolume(
|
||||
podName, expectedVolumeName); !podExistsInVolume {
|
||||
t.Fatalf(
|
||||
"DSW PodExistsInVolume returned incorrect value. Expected: <true> Actual: <%v>",
|
||||
podExistsInVolume)
|
||||
}
|
||||
|
||||
// reconcile with actual state so that volume is added into the actual state
|
||||
// desired state populator now can successfully delete the pod and volume
|
||||
fakeASW := dswp.actualStateOfWorld
|
||||
reconcileASW(fakeASW, dswp.desiredStateOfWorld, t)
|
||||
dswp.findAndRemoveDeletedPods()
|
||||
volumeExists = dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName)
|
||||
if volumeExists {
|
||||
t.Fatalf(
|
||||
"VolumeExists(%q) failed. Expected: <false> Actual: <%v>",
|
||||
expectedVolumeName,
|
||||
volumeExists)
|
||||
}
|
||||
|
||||
if podExistsInVolume := dswp.desiredStateOfWorld.PodExistsInVolume(
|
||||
podName, expectedVolumeName); podExistsInVolume {
|
||||
t.Fatalf(
|
||||
"DSW PodExistsInVolume returned incorrect value. Expected: <false> Actual: <%v>",
|
||||
podExistsInVolume)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindAndRemoveDeletedPodsWithUncertain(t *testing.T) {
|
||||
dswp, fakeRuntime, pod, expectedVolumeName, pv := prepareDSWPWithPodPV(t)
|
||||
podName := util.GetUniquePodName(pod)
|
||||
|
||||
//let the pod be terminated
|
||||
podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name)
|
||||
if !exist {
|
||||
t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace)
|
||||
}
|
||||
podGet.Status.Phase = v1.PodFailed
|
||||
dswp.podManager.DeletePod(pod)
|
||||
fakeRuntime.PodList = nil
|
||||
|
||||
// Add the volume to ASW by reconciling.
|
||||
fakeASW := dswp.actualStateOfWorld
|
||||
reconcileASW(fakeASW, dswp.desiredStateOfWorld, t)
|
||||
|
||||
// Mark the volume as uncertain
|
||||
opts := operationexecutor.MarkVolumeOpts{
|
||||
PodName: util.GetUniquePodName(pod),
|
||||
PodUID: pod.UID,
|
||||
VolumeName: expectedVolumeName,
|
||||
OuterVolumeSpecName: "dswp-test-volume-name",
|
||||
VolumeGidVolume: "",
|
||||
VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false),
|
||||
VolumeMountState: operationexecutor.VolumeMountUncertain,
|
||||
}
|
||||
err := dswp.actualStateOfWorld.MarkVolumeMountAsUncertain(opts)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to set the volume as uncertain: %s", err)
|
||||
}
|
||||
|
||||
// fakeRuntime can not get the pod,so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted
|
||||
dswp.findAndRemoveDeletedPods()
|
||||
if dswp.pods.processedPods[podName] {
|
||||
t.Fatalf("Failed to remove pods from desired state of world since they no longer exist")
|
||||
}
|
||||
|
||||
volumeExists := dswp.desiredStateOfWorld.VolumeExists(expectedVolumeName)
|
||||
if volumeExists {
|
||||
t.Fatalf(
|
||||
"VolumeExists(%q) failed. Expected: <false> Actual: <%v>",
|
||||
expectedVolumeName,
|
||||
volumeExists)
|
||||
}
|
||||
|
||||
if podExistsInVolume := dswp.desiredStateOfWorld.PodExistsInVolume(
|
||||
podName, expectedVolumeName); podExistsInVolume {
|
||||
t.Fatalf(
|
||||
"DSW PodExistsInVolume returned incorrect value. Expected: <false> Actual: <%v>",
|
||||
podExistsInVolume)
|
||||
}
|
||||
|
||||
volumesToMount := dswp.desiredStateOfWorld.GetVolumesToMount()
|
||||
for _, volume := range volumesToMount {
|
||||
if volume.VolumeName == expectedVolumeName {
|
||||
t.Fatalf(
|
||||
"Found volume %v in the list of desired state of world volumes to mount. Expected not",
|
||||
expectedVolumeName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func prepareDSWPWithPodPV(t *testing.T) (*desiredStateOfWorldPopulator, *containertest.FakeRuntime, *v1.Pod, v1.UniqueVolumeName, *v1.PersistentVolume) {
|
||||
// create dswp
|
||||
mode := v1.PersistentVolumeFilesystem
|
||||
pv := &v1.PersistentVolume{
|
||||
@ -221,181 +399,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) {
|
||||
|
||||
verifyVolumeExistsInVolumesToMount(
|
||||
t, v1.UniqueVolumeName(generatedVolumeName), false /* expectReportedInUse */, fakesDSW)
|
||||
|
||||
//let the pod be terminated
|
||||
podGet, exist := fakePodManager.GetPodByName(pod.Namespace, pod.Name)
|
||||
if !exist {
|
||||
t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace)
|
||||
}
|
||||
podGet.Status.Phase = v1.PodFailed
|
||||
|
||||
fakePodManager.DeletePod(pod)
|
||||
|
||||
fakeRuntime.PodList = []*containertest.FakePod{
|
||||
{
|
||||
Pod: &kubecontainer.Pod{
|
||||
Name: pod.Name,
|
||||
ID: pod.UID,
|
||||
Sandboxes: []*kubecontainer.Container{
|
||||
{
|
||||
Name: "dswp-test-pod-sandbox",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
dswp.findAndRemoveDeletedPods()
|
||||
|
||||
if !dswp.pods.processedPods[podName] {
|
||||
t.Fatalf("Pod should not been removed from desired state of world since sandbox exist")
|
||||
}
|
||||
|
||||
fakeRuntime.PodList = nil
|
||||
|
||||
// fakeRuntime can not get the pod,so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted
|
||||
dswp.findAndRemoveDeletedPods()
|
||||
if dswp.pods.processedPods[podName] {
|
||||
t.Fatalf("Failed to remove pods from desired state of world since they no longer exist")
|
||||
}
|
||||
|
||||
volumeExists = fakesDSW.VolumeExists(expectedVolumeName)
|
||||
if volumeExists {
|
||||
t.Fatalf(
|
||||
"VolumeExists(%q) failed. Expected: <false> Actual: <%v>",
|
||||
expectedVolumeName,
|
||||
volumeExists)
|
||||
}
|
||||
|
||||
if podExistsInVolume := fakesDSW.PodExistsInVolume(
|
||||
podName, expectedVolumeName); podExistsInVolume {
|
||||
t.Fatalf(
|
||||
"DSW PodExistsInVolume returned incorrect value. Expected: <false> Actual: <%v>",
|
||||
podExistsInVolume)
|
||||
}
|
||||
|
||||
volumesToMount := fakesDSW.GetVolumesToMount()
|
||||
for _, volume := range volumesToMount {
|
||||
if volume.VolumeName == expectedVolumeName {
|
||||
t.Fatalf(
|
||||
"Found volume %v in the list of desired state of world volumes to mount. Expected not",
|
||||
expectedVolumeName)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) {
|
||||
// create dswp
|
||||
mode := v1.PersistentVolumeFilesystem
|
||||
pv := &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "dswp-test-volume-name",
|
||||
},
|
||||
Spec: v1.PersistentVolumeSpec{
|
||||
ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "file-bound"},
|
||||
VolumeMode: &mode,
|
||||
},
|
||||
}
|
||||
pvc := &v1.PersistentVolumeClaim{
|
||||
Spec: v1.PersistentVolumeClaimSpec{
|
||||
VolumeName: "dswp-test-volume-name",
|
||||
},
|
||||
Status: v1.PersistentVolumeClaimStatus{
|
||||
Phase: v1.ClaimBound,
|
||||
},
|
||||
}
|
||||
dswp, fakePodManager, fakesDSW, _ := createDswpWithVolume(t, pv, pvc)
|
||||
|
||||
// create pod
|
||||
containers := []v1.Container{
|
||||
{
|
||||
VolumeMounts: []v1.VolumeMount{
|
||||
{
|
||||
Name: "dswp-test-volume-name",
|
||||
MountPath: "/mnt",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers)
|
||||
|
||||
fakePodManager.AddPod(pod)
|
||||
|
||||
podName := util.GetUniquePodName(pod)
|
||||
|
||||
generatedVolumeName := "fake-plugin/" + pod.Spec.Volumes[0].Name
|
||||
|
||||
dswp.findAndAddNewPods()
|
||||
|
||||
if !dswp.pods.processedPods[podName] {
|
||||
t.Fatalf("Failed to record that the volumes for the specified pod: %s have been processed by the populator", podName)
|
||||
}
|
||||
|
||||
expectedVolumeName := v1.UniqueVolumeName(generatedVolumeName)
|
||||
|
||||
volumeExists := fakesDSW.VolumeExists(expectedVolumeName)
|
||||
if !volumeExists {
|
||||
t.Fatalf(
|
||||
"VolumeExists(%q) failed. Expected: <true> Actual: <%v>",
|
||||
expectedVolumeName,
|
||||
volumeExists)
|
||||
}
|
||||
|
||||
if podExistsInVolume := fakesDSW.PodExistsInVolume(
|
||||
podName, expectedVolumeName); !podExistsInVolume {
|
||||
t.Fatalf(
|
||||
"DSW PodExistsInVolume returned incorrect value. Expected: <true> Actual: <%v>",
|
||||
podExistsInVolume)
|
||||
}
|
||||
|
||||
verifyVolumeExistsInVolumesToMount(
|
||||
t, v1.UniqueVolumeName(generatedVolumeName), false /* expectReportedInUse */, fakesDSW)
|
||||
|
||||
//let the pod be terminated
|
||||
podGet, exist := fakePodManager.GetPodByName(pod.Namespace, pod.Name)
|
||||
if !exist {
|
||||
t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace)
|
||||
}
|
||||
podGet.Status.Phase = v1.PodFailed
|
||||
|
||||
dswp.findAndRemoveDeletedPods()
|
||||
// Although Pod status is terminated, pod still exists in pod manager and actual state does not has this pod and volume information
|
||||
// desired state populator will fail to delete this pod and volume first
|
||||
volumeExists = fakesDSW.VolumeExists(expectedVolumeName)
|
||||
if !volumeExists {
|
||||
t.Fatalf(
|
||||
"VolumeExists(%q) failed. Expected: <true> Actual: <%v>",
|
||||
expectedVolumeName,
|
||||
volumeExists)
|
||||
}
|
||||
|
||||
if podExistsInVolume := fakesDSW.PodExistsInVolume(
|
||||
podName, expectedVolumeName); !podExistsInVolume {
|
||||
t.Fatalf(
|
||||
"DSW PodExistsInVolume returned incorrect value. Expected: <true> Actual: <%v>",
|
||||
podExistsInVolume)
|
||||
}
|
||||
|
||||
// reconcile with actual state so that volume is added into the actual state
|
||||
// desired state populator now can successfully delete the pod and volume
|
||||
fakeASW := dswp.actualStateOfWorld
|
||||
reconcileASW(fakeASW, fakesDSW, t)
|
||||
dswp.findAndRemoveDeletedPods()
|
||||
volumeExists = fakesDSW.VolumeExists(expectedVolumeName)
|
||||
if volumeExists {
|
||||
t.Fatalf(
|
||||
"VolumeExists(%q) failed. Expected: <false> Actual: <%v>",
|
||||
expectedVolumeName,
|
||||
volumeExists)
|
||||
}
|
||||
|
||||
if podExistsInVolume := fakesDSW.PodExistsInVolume(
|
||||
podName, expectedVolumeName); podExistsInVolume {
|
||||
t.Fatalf(
|
||||
"DSW PodExistsInVolume returned incorrect value. Expected: <false> Actual: <%v>",
|
||||
podExistsInVolume)
|
||||
}
|
||||
return dswp, fakeRuntime, pod, expectedVolumeName, pv
|
||||
}
|
||||
|
||||
func TestFindAndRemoveNonattachableVolumes(t *testing.T) {
|
||||
|
@ -113,6 +113,14 @@ type VolumeManager interface {
|
||||
// volumes.
|
||||
GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
|
||||
|
||||
// GetPossiblyMountedVolumesForPod returns a VolumeMap containing the volumes
|
||||
// referenced by the specified pod that are either successfully attached
|
||||
// and mounted or are "uncertain", i.e. a volume plugin may be mounting
|
||||
// them right now. The key in the map is the OuterVolumeSpecName (i.e.
|
||||
// pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
|
||||
// volumes.
|
||||
GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
|
||||
|
||||
// GetExtraSupplementalGroupsForPod returns a list of the extra
|
||||
// supplemental groups for the Pod. These extra supplemental groups come
|
||||
// from annotations on persistent volumes that the pod depends on.
|
||||
@ -290,6 +298,19 @@ func (vm *volumeManager) GetMountedVolumesForPod(podName types.UniquePodName) co
|
||||
return podVolumes
|
||||
}
|
||||
|
||||
func (vm *volumeManager) GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
|
||||
podVolumes := make(container.VolumeMap)
|
||||
for _, mountedVolume := range vm.actualStateOfWorld.GetPossiblyMountedVolumesForPod(podName) {
|
||||
podVolumes[mountedVolume.OuterVolumeSpecName] = container.VolumeInfo{
|
||||
Mounter: mountedVolume.Mounter,
|
||||
BlockVolumeMapper: mountedVolume.BlockVolumeMapper,
|
||||
ReadOnly: mountedVolume.VolumeSpec.ReadOnly,
|
||||
InnerVolumeSpecName: mountedVolume.InnerVolumeSpecName,
|
||||
}
|
||||
}
|
||||
return podVolumes
|
||||
}
|
||||
|
||||
func (vm *volumeManager) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 {
|
||||
podName := util.GetUniquePodName(pod)
|
||||
supplementalGroups := sets.NewString()
|
||||
|
@ -17,7 +17,7 @@ limitations under the License.
|
||||
package volumemanager
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/volume/util/types"
|
||||
@ -55,6 +55,11 @@ func (f *FakeVolumeManager) GetMountedVolumesForPod(podName types.UniquePodName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPossiblyMountedVolumesForPod is not implemented
|
||||
func (f *FakeVolumeManager) GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetExtraSupplementalGroupsForPod is not implemented
|
||||
func (f *FakeVolumeManager) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 {
|
||||
return nil
|
||||
|
@ -821,6 +821,22 @@ func (og *operationGenerator) GenerateUnmountVolumeFunc(
|
||||
// Execute unmount
|
||||
unmountErr := volumeUnmounter.TearDown()
|
||||
if unmountErr != nil {
|
||||
// Mark the volume as uncertain, so SetUp is called for new pods. Teardown may be already in progress.
|
||||
opts := MarkVolumeOpts{
|
||||
PodName: volumeToUnmount.PodName,
|
||||
PodUID: volumeToUnmount.PodUID,
|
||||
VolumeName: volumeToUnmount.VolumeName,
|
||||
OuterVolumeSpecName: volumeToUnmount.OuterVolumeSpecName,
|
||||
VolumeGidVolume: volumeToUnmount.VolumeGidValue,
|
||||
VolumeSpec: volumeToUnmount.VolumeSpec,
|
||||
VolumeMountState: VolumeMountUncertain,
|
||||
}
|
||||
markMountUncertainErr := actualStateOfWorld.MarkVolumeMountAsUncertain(opts)
|
||||
if markMountUncertainErr != nil {
|
||||
// There is nothing else we can do. Hope that UnmountVolume will be re-tried shortly.
|
||||
klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeMountAsUncertain failed", markMountUncertainErr).Error())
|
||||
}
|
||||
|
||||
// On failure, return error. Caller will log and retry.
|
||||
eventErr, detailedErr := volumeToUnmount.GenerateError("UnmountVolume.TearDown failed", unmountErr)
|
||||
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
|
||||
@ -907,6 +923,13 @@ func (og *operationGenerator) GenerateUnmountDeviceFunc(
|
||||
// Execute unmount
|
||||
unmountDeviceErr := volumeDeviceUnmounter.UnmountDevice(deviceMountPath)
|
||||
if unmountDeviceErr != nil {
|
||||
// Mark the device as uncertain, so MountDevice is called for new pods. UnmountDevice may be already in progress.
|
||||
markDeviceUncertainErr := actualStateOfWorld.MarkDeviceAsUncertain(deviceToDetach.VolumeName, deviceToDetach.DevicePath, deviceMountPath)
|
||||
if markDeviceUncertainErr != nil {
|
||||
// There is nothing else we can do. Hope that UnmountDevice will be re-tried shortly.
|
||||
klog.Errorf(deviceToDetach.GenerateErrorDetailed("UnmountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainErr).Error())
|
||||
}
|
||||
|
||||
// On failure, return error. Caller will log and retry.
|
||||
eventErr, detailedErr := deviceToDetach.GenerateError("UnmountDevice failed", unmountDeviceErr)
|
||||
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
|
||||
@ -1208,6 +1231,25 @@ func (og *operationGenerator) GenerateUnmapVolumeFunc(
|
||||
// plugins/kubernetes.io/{PluginName}/volumeDevices/{volumePluginDependentPath}/{podUID}
|
||||
globalUnmapPath := volumeToUnmount.DeviceMountPath
|
||||
|
||||
// Mark the device as uncertain to make sure kubelet calls UnmapDevice again in all the "return err"
|
||||
// cases below. The volume is marked as fully un-mapped at the end of this function, when everything
|
||||
// succeeds.
|
||||
markVolumeOpts := MarkVolumeOpts{
|
||||
PodName: volumeToUnmount.PodName,
|
||||
PodUID: volumeToUnmount.PodUID,
|
||||
VolumeName: volumeToUnmount.VolumeName,
|
||||
OuterVolumeSpecName: volumeToUnmount.OuterVolumeSpecName,
|
||||
VolumeGidVolume: volumeToUnmount.VolumeGidValue,
|
||||
VolumeSpec: volumeToUnmount.VolumeSpec,
|
||||
VolumeMountState: VolumeMountUncertain,
|
||||
}
|
||||
markVolumeUncertainErr := actualStateOfWorld.MarkVolumeMountAsUncertain(markVolumeOpts)
|
||||
if markVolumeUncertainErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.MarkDeviceAsUncertain failed", markVolumeUncertainErr)
|
||||
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
|
||||
}
|
||||
|
||||
// Execute common unmap
|
||||
unmapErr := util.UnmapBlockVolume(og.blkUtil, globalUnmapPath, podDeviceUnmapPath, volName, volumeToUnmount.PodUID)
|
||||
if unmapErr != nil {
|
||||
@ -1309,6 +1351,17 @@ func (og *operationGenerator) GenerateUnmapDeviceFunc(
|
||||
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
|
||||
}
|
||||
|
||||
// Mark the device as uncertain to make sure kubelet calls UnmapDevice again in all the "return err"
|
||||
// cases below. The volume is marked as fully un-mapped at the end of this function, when everything
|
||||
// succeeds.
|
||||
markDeviceUncertainErr := actualStateOfWorld.MarkDeviceAsUncertain(
|
||||
deviceToDetach.VolumeName, deviceToDetach.DevicePath, globalMapPath)
|
||||
if markDeviceUncertainErr != nil {
|
||||
// On failure, return error. Caller will log and retry.
|
||||
eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.MarkDeviceAsUncertain failed", markDeviceUncertainErr)
|
||||
return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
|
||||
}
|
||||
|
||||
// Call TearDownDevice if blockVolumeUnmapper implements CustomBlockVolumeUnmapper
|
||||
if customBlockVolumeUnmapper, ok := blockVolumeUnmapper.(volume.CustomBlockVolumeUnmapper); ok {
|
||||
// Execute tear down device
|
||||
|
@ -898,6 +898,128 @@ var _ = utils.SIGDescribe("CSI mock volume", func() {
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.Context("CSI NodeUnstage error cases [Slow]", func() {
|
||||
trackedCalls := []string{
|
||||
"NodeStageVolume",
|
||||
"NodeUnstageVolume",
|
||||
}
|
||||
|
||||
// Each test starts two pods in sequence.
|
||||
// The first pod always runs successfully, but NodeUnstage hook can set various error conditions.
|
||||
// The test then checks how NodeStage of the second pod is called.
|
||||
tests := []struct {
|
||||
name string
|
||||
expectedCalls []csiCall
|
||||
|
||||
// Called for each NodeStageVolume calls, with counter incremented atomically before
|
||||
// the invocation (i.e. first value will be 1) and index of deleted pod (the first pod
|
||||
// has index 1)
|
||||
nodeUnstageHook func(counter, pod int64) error
|
||||
}{
|
||||
{
|
||||
// This is already tested elsewhere, adding simple good case here to test the test framework.
|
||||
name: "should call NodeStage after NodeUnstage success",
|
||||
expectedCalls: []csiCall{
|
||||
{expectedMethod: "NodeStageVolume", expectedError: codes.OK},
|
||||
{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
|
||||
{expectedMethod: "NodeStageVolume", expectedError: codes.OK},
|
||||
{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "two pods: should call NodeStage after previous NodeUnstage final error",
|
||||
expectedCalls: []csiCall{
|
||||
{expectedMethod: "NodeStageVolume", expectedError: codes.OK},
|
||||
{expectedMethod: "NodeUnstageVolume", expectedError: codes.InvalidArgument},
|
||||
{expectedMethod: "NodeStageVolume", expectedError: codes.OK},
|
||||
{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
|
||||
},
|
||||
nodeUnstageHook: func(counter, pod int64) error {
|
||||
if pod == 1 {
|
||||
return status.Error(codes.InvalidArgument, "fake final error")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "two pods: should call NodeStage after previous NodeUnstage transient error",
|
||||
expectedCalls: []csiCall{
|
||||
{expectedMethod: "NodeStageVolume", expectedError: codes.OK},
|
||||
{expectedMethod: "NodeUnstageVolume", expectedError: codes.DeadlineExceeded},
|
||||
{expectedMethod: "NodeStageVolume", expectedError: codes.OK},
|
||||
{expectedMethod: "NodeUnstageVolume", expectedError: codes.OK},
|
||||
},
|
||||
nodeUnstageHook: func(counter, pod int64) error {
|
||||
if pod == 1 {
|
||||
return status.Error(codes.DeadlineExceeded, "fake transient error")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, t := range tests {
|
||||
test := t
|
||||
ginkgo.It(test.name, func() {
|
||||
// Index of the last deleted pod. NodeUnstage calls are then related to this pod.
|
||||
var deletedPodNumber int64 = 1
|
||||
var hooks *drivers.Hooks
|
||||
if test.nodeUnstageHook != nil {
|
||||
hooks = createPreHook("NodeUnstageVolume", func(counter int64) error {
|
||||
pod := atomic.LoadInt64(&deletedPodNumber)
|
||||
return test.nodeUnstageHook(counter, pod)
|
||||
})
|
||||
}
|
||||
init(testParameters{
|
||||
disableAttach: true,
|
||||
registerDriver: true,
|
||||
hooks: hooks,
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
_, claim, pod := createPod(false)
|
||||
if pod == nil {
|
||||
return
|
||||
}
|
||||
// Wait for PVC to get bound to make sure the CSI driver is fully started.
|
||||
err := e2epv.WaitForPersistentVolumeClaimPhase(v1.ClaimBound, f.ClientSet, f.Namespace.Name, claim.Name, time.Second, framework.ClaimProvisionTimeout)
|
||||
framework.ExpectNoError(err, "while waiting for PVC to get provisioned")
|
||||
err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
|
||||
framework.ExpectNoError(err, "while waiting for the first pod to start")
|
||||
err = e2epod.DeletePodWithWait(m.cs, pod)
|
||||
framework.ExpectNoError(err, "while deleting the first pod")
|
||||
|
||||
// Create the second pod
|
||||
pod, err = createPodWithPVC(claim)
|
||||
framework.ExpectNoError(err, "while creating the second pod")
|
||||
err = e2epod.WaitForPodNameRunningInNamespace(m.cs, pod.Name, pod.Namespace)
|
||||
framework.ExpectNoError(err, "while waiting for the second pod to start")
|
||||
// The second pod is running and kubelet can't call NodeUnstage of the first one.
|
||||
// Therefore incrementing the pod counter is safe here.
|
||||
atomic.AddInt64(&deletedPodNumber, 1)
|
||||
err = e2epod.DeletePodWithWait(m.cs, pod)
|
||||
framework.ExpectNoError(err, "while deleting the second pod")
|
||||
|
||||
ginkgo.By("Waiting for all remaining expected CSI calls")
|
||||
err = wait.Poll(time.Second, csiUnstageWaitTimeout, func() (done bool, err error) {
|
||||
_, index, err := compareCSICalls(trackedCalls, test.expectedCalls, m.driver.GetCalls)
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
if index == 0 {
|
||||
// No CSI call received yet
|
||||
return false, nil
|
||||
}
|
||||
if len(test.expectedCalls) == index {
|
||||
// all calls received
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
framework.ExpectNoError(err, "while waiting for all CSI calls")
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.Context("storage capacity", func() {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
Loading…
Reference in New Issue
Block a user