mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-11 04:52:08 +00:00
Merge pull request #45346 from codablock/fix_double_attach
Automatic merge from submit-queue Don't try to attach volumes which are already attached to other nodes This PR is a replacement for https://github.com/kubernetes/kubernetes/pull/40148. I was not able to push fixes and rebases to the original branch as I don't have access to the Github organization anymore. CC @saad-ali You probably have to update the PR link in [Q2 2017 (v1.7)](https://docs.google.com/spreadsheets/d/1t4z5DYKjX2ZDlkTpCnp18icRAQqOE85C1T1r2gqJVck/edit#gid=14624465) I assume the PR will need a new "ok to test" **ORIGINAL PR DESCRIPTION** This PR fixes an issue with the attach/detach volume controller. There are cases where the `desiredStateOfWorld` contains the same volume for multiple nodes, resulting in the attach/detach controller attaching this volume to multiple nodes. This of course fails for volumes like AWS EBS, Azure Disks, ... I observed this situation on Azure when using Azure Disks and replication controllers which start to reschedule PODs. When you delete a POD that belongs to a RC, the RC will immediately schedule a new POD on another node. This results in a short time (max a few seconds) where you have 2 PODs which try to attach/mount the same volume on different nodes. As the old POD is still alive, the attach/detach controller does not try to detach the volume and starts to attach the volume to the new POD immediately. This behavior was probably not noticed before on other clouds as the bogus attempt to attach probably fails pretty fast and thus is unnoticed. As the situation with the 2 PODs disappears after a few seconds, a detach for the old POD is initiated and thus the new POD can attach successfully. On Azure however, attaching and detaching takes quite long, resulting in the first bogus attach attempt to already eat up much time. When attaching fails on Azure and reports that it is already attached somewhere else, the cloud provider immediately does a detach call for the same volume+node it tried to attach to. This is done to make sure the failed attach request is aborted immediately. You can find this here: https://github.com/kubernetes/kubernetes/blob/master/pkg/cloudprovider/providers/azure/azure_storage.go#L74 The complete flow of attach->fail->abort eats up valuable time and the attach/detach controller can not proceed with other work while this is happening. This means, if the old POD disappears in the meantime, the controller can't even start the detach for the volume which delays the whole process of rescheduling and reattaching. Also, I and other people have observed very strange behavior where disks ended up being "attached" to multiple VMs at the same time as reported by Azure Portal. This results in the controller to fail reattaching forever. It's hard to figure out why and when this happens and there is no reproducer known yet. I can imagine however that the described behavior correlates with what I described above. I was not sure if there are actually cases where it is perfectly fine to have a volume mounted to multiple PODs/nodes. At least technically, this should be possible with network based volumes, e.g. nfs. Can someone with more knowledge about volumes help me here? I may need to add a check before skipping attaching in `reconcile`. CC @colemickens @rootfs --> ```release-note Don't try to attach volume to new node if it is already attached to another node and the volume does not support multi-attach. ```
This commit is contained in:
commit
f499606bfe
@ -111,6 +111,9 @@ type ActualStateOfWorld interface {
|
||||
|
||||
GetAttachedVolumesPerNode() map[types.NodeName][]operationexecutor.AttachedVolume
|
||||
|
||||
// GetNodesForVolume returns the nodes on which the volume is attached
|
||||
GetNodesForVolume(volumeName v1.UniqueVolumeName) []types.NodeName
|
||||
|
||||
// GetVolumesToReportAttached returns a map containing the set of nodes for
|
||||
// which the VolumesAttached Status field in the Node API object should be
|
||||
// updated. The key in this map is the name of the node to update and the
|
||||
@ -581,6 +584,22 @@ func (asw *actualStateOfWorld) GetAttachedVolumesPerNode() map[types.NodeName][]
|
||||
return attachedVolumesPerNode
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) GetNodesForVolume(volumeName v1.UniqueVolumeName) []types.NodeName {
|
||||
asw.RLock()
|
||||
defer asw.RUnlock()
|
||||
|
||||
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
|
||||
if !volumeExists || len(volumeObj.nodesAttachedTo) == 0 {
|
||||
return []types.NodeName{}
|
||||
}
|
||||
|
||||
nodes := []types.NodeName{}
|
||||
for k := range volumeObj.nodesAttachedTo {
|
||||
nodes = append(nodes, k)
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][]v1.AttachedVolume {
|
||||
asw.RLock()
|
||||
defer asw.RUnlock()
|
||||
|
@ -13,10 +13,11 @@ go_library(
|
||||
srcs = ["reconciler.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/controller/volume/attachdetach/cache:go_default_library",
|
||||
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
|
||||
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
|
||||
"//pkg/volume/util/nestedpendingoperations:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/util/operationexecutor:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
|
@ -25,10 +25,11 @@ import (
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
|
||||
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
||||
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
)
|
||||
|
||||
@ -125,6 +126,41 @@ func (rc *reconciler) syncStates() {
|
||||
rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld)
|
||||
}
|
||||
|
||||
// isMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible.
|
||||
// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns
|
||||
// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the
|
||||
// attacher to fail fast in such cases.
|
||||
// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047
|
||||
func (rc *reconciler) isMultiAttachForbidden(volumeSpec *volume.Spec) bool {
|
||||
if volumeSpec.Volume != nil {
|
||||
// Check for volume types which are known to fail slow or cause trouble when trying to multi-attach
|
||||
if volumeSpec.Volume.AzureDisk != nil ||
|
||||
volumeSpec.Volume.Cinder != nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Only if this volume is a persistent volume, we have reliable information on wether it's allowed or not to
|
||||
// multi-attach. We trust in the individual volume implementations to not allow unsupported access modes
|
||||
if volumeSpec.PersistentVolume != nil {
|
||||
if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 {
|
||||
// No access mode specified so we don't know for sure. Let the attacher fail if needed
|
||||
return false
|
||||
}
|
||||
|
||||
// check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false
|
||||
for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes {
|
||||
if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// we don't know if it's supported or not and let the attacher fail later in cases it's not supported
|
||||
return false
|
||||
}
|
||||
|
||||
func (rc *reconciler) reconcile() {
|
||||
// Detaches are triggered before attaches so that volumes referenced by
|
||||
// pods that are rescheduled to a different node are detached first.
|
||||
@ -133,6 +169,16 @@ func (rc *reconciler) reconcile() {
|
||||
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
|
||||
if !rc.desiredStateOfWorld.VolumeExists(
|
||||
attachedVolume.VolumeName, attachedVolume.NodeName) {
|
||||
|
||||
// Don't even try to start an operation if there is already one running
|
||||
// This check must be done before we do any other checks, as otherwise the other checks
|
||||
// may pass while at the same time the volume leaves the pending state, resulting in
|
||||
// double detach attempts
|
||||
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "") {
|
||||
glog.V(10).Infof("Operation for volume %q is already running. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||
continue
|
||||
}
|
||||
|
||||
// Set the detach request time
|
||||
elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName)
|
||||
if err != nil {
|
||||
@ -177,10 +223,8 @@ func (rc *reconciler) reconcile() {
|
||||
glog.Infof(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", fmt.Sprintf("This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", rc.maxWaitForUnmountDuration)))
|
||||
}
|
||||
}
|
||||
if err != nil &&
|
||||
!nestedpendingoperations.IsAlreadyExists(err) &&
|
||||
!exponentialbackoff.IsExponentialBackoff(err) {
|
||||
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||
if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
|
||||
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||
// Log all other errors.
|
||||
glog.Errorf(attachedVolume.GenerateErrorDetailed("attacherDetacher.DetachVolume failed to start", err).Error())
|
||||
}
|
||||
@ -195,16 +239,28 @@ func (rc *reconciler) reconcile() {
|
||||
glog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", ""))
|
||||
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||
} else {
|
||||
// Don't even try to start an operation if there is already one running
|
||||
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") {
|
||||
glog.V(10).Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
|
||||
continue
|
||||
}
|
||||
|
||||
if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) {
|
||||
nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName)
|
||||
if len(nodes) > 0 {
|
||||
glog.V(4).Infof("Volume %q is already exclusively attached to node %q and can't be attached to %q", volumeToAttach.VolumeName, nodes, volumeToAttach.NodeName)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Volume/Node doesn't exist, spawn a goroutine to attach it
|
||||
glog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting attacherDetacher.AttachVolume", ""))
|
||||
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
|
||||
if err == nil {
|
||||
glog.Infof(volumeToAttach.GenerateMsgDetailed("attacherDetacher.AttachVolume started", ""))
|
||||
}
|
||||
if err != nil &&
|
||||
!nestedpendingoperations.IsAlreadyExists(err) &&
|
||||
!exponentialbackoff.IsExponentialBackoff(err) {
|
||||
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||
if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
|
||||
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
|
||||
// Log all other errors.
|
||||
glog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error())
|
||||
}
|
||||
|
@ -283,7 +283,9 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
|
||||
}
|
||||
|
||||
// Act
|
||||
go reconciler.Run(wait.NeverStop)
|
||||
ch := make(chan struct{})
|
||||
go reconciler.Run(ch)
|
||||
defer close(ch)
|
||||
|
||||
// Assert
|
||||
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
|
||||
@ -313,6 +315,183 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
|
||||
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
|
||||
}
|
||||
|
||||
// Creates a volume with accessMode ReadWriteMany
|
||||
// Populates desiredStateOfWorld cache with two ode/volume/pod tuples pointing to the created volume
|
||||
// Calls Run()
|
||||
// Verifies there are two attach calls and no detach calls.
|
||||
// Deletes the first node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted.
|
||||
// Verifies there is one detach call and no (new) attach calls.
|
||||
// Deletes the second node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted.
|
||||
// Verifies there are two detach calls and no (new) attach calls.
|
||||
func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteMany(t *testing.T) {
|
||||
// Arrange
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(volumePluginMgr)
|
||||
fakeKubeClient := controllervolumetesting.CreateTestClient()
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu)
|
||||
podName1 := "pod-uid1"
|
||||
podName2 := "pod-uid2"
|
||||
volumeName := v1.UniqueVolumeName("volume-name")
|
||||
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
|
||||
volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteMany}
|
||||
nodeName1 := k8stypes.NodeName("node-name1")
|
||||
nodeName2 := k8stypes.NodeName("node-name2")
|
||||
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
|
||||
dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/)
|
||||
|
||||
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1)
|
||||
if podAddErr != nil {
|
||||
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
|
||||
}
|
||||
_, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2)
|
||||
if podAddErr != nil {
|
||||
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
|
||||
}
|
||||
|
||||
// Act
|
||||
ch := make(chan struct{})
|
||||
go reconciler.Run(ch)
|
||||
defer close(ch)
|
||||
|
||||
// Assert
|
||||
waitForNewAttacherCallCount(t, 2 /* expectedCallCount */, fakePlugin)
|
||||
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
|
||||
waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
|
||||
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
|
||||
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
|
||||
|
||||
nodesForVolume := asw.GetNodesForVolume(generatedVolumeName)
|
||||
if len(nodesForVolume) != 2 {
|
||||
t.Fatal("Volume was not attached to both nodes")
|
||||
}
|
||||
|
||||
// Act
|
||||
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
|
||||
volumeExists := dsw.VolumeExists(generatedVolumeName, nodeName1)
|
||||
if volumeExists {
|
||||
t.Fatalf(
|
||||
"Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
|
||||
podName1,
|
||||
generatedVolumeName,
|
||||
nodeName1)
|
||||
}
|
||||
|
||||
// Assert -- Timer will triger detach
|
||||
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
|
||||
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
|
||||
waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
|
||||
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
|
||||
waitForTotalDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
|
||||
|
||||
// Act
|
||||
dsw.DeletePod(types.UniquePodName(podName2), generatedVolumeName, nodeName2)
|
||||
volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName2)
|
||||
if volumeExists {
|
||||
t.Fatalf(
|
||||
"Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
|
||||
podName2,
|
||||
generatedVolumeName,
|
||||
nodeName2)
|
||||
}
|
||||
|
||||
// Assert -- Timer will triger detach
|
||||
waitForNewDetacherCallCount(t, 2 /* expectedCallCount */, fakePlugin)
|
||||
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
|
||||
waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
|
||||
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
|
||||
waitForTotalDetachCallCount(t, 2 /* expectedDetachCallCount */, fakePlugin)
|
||||
}
|
||||
|
||||
// Creates a volume with accessMode ReadWriteOnce
|
||||
// Populates desiredStateOfWorld cache with two ode/volume/pod tuples pointing to the created volume
|
||||
// Calls Run()
|
||||
// Verifies there is one attach call and no detach calls.
|
||||
// Deletes the node/volume/pod tuple from desiredStateOfWorld which succeeded in attaching
|
||||
// Verifies there are two attach call and one detach call.
|
||||
func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.T) {
|
||||
// Arrange
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(volumePluginMgr)
|
||||
fakeKubeClient := controllervolumetesting.CreateTestClient()
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu)
|
||||
podName1 := "pod-uid1"
|
||||
podName2 := "pod-uid2"
|
||||
volumeName := v1.UniqueVolumeName("volume-name")
|
||||
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
|
||||
volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}
|
||||
nodeName1 := k8stypes.NodeName("node-name1")
|
||||
nodeName2 := k8stypes.NodeName("node-name2")
|
||||
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
|
||||
dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/)
|
||||
|
||||
// Add both pods at the same time to provoke a potential race condition in the reconciler
|
||||
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1)
|
||||
if podAddErr != nil {
|
||||
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
|
||||
}
|
||||
_, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2)
|
||||
if podAddErr != nil {
|
||||
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
|
||||
}
|
||||
|
||||
// Act
|
||||
ch := make(chan struct{})
|
||||
go reconciler.Run(ch)
|
||||
defer close(ch)
|
||||
|
||||
// Assert
|
||||
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
|
||||
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
|
||||
waitForTotalAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
|
||||
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
|
||||
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
|
||||
|
||||
nodesForVolume := asw.GetNodesForVolume(generatedVolumeName)
|
||||
if len(nodesForVolume) == 0 {
|
||||
t.Fatal("Volume was not attached to any node")
|
||||
} else if len(nodesForVolume) != 1 {
|
||||
t.Fatal("Volume was attached to multiple nodes")
|
||||
}
|
||||
|
||||
// Act
|
||||
podToDelete := ""
|
||||
if nodesForVolume[0] == nodeName1 {
|
||||
podToDelete = podName1
|
||||
} else if nodesForVolume[0] == nodeName2 {
|
||||
podToDelete = podName2
|
||||
} else {
|
||||
t.Fatal("Volume attached to unexpected node")
|
||||
}
|
||||
|
||||
dsw.DeletePod(types.UniquePodName(podToDelete), generatedVolumeName, nodesForVolume[0])
|
||||
volumeExists := dsw.VolumeExists(generatedVolumeName, nodesForVolume[0])
|
||||
if volumeExists {
|
||||
t.Fatalf(
|
||||
"Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
|
||||
podToDelete,
|
||||
generatedVolumeName,
|
||||
nodesForVolume[0])
|
||||
}
|
||||
|
||||
// Assert
|
||||
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
|
||||
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
|
||||
waitForTotalDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
|
||||
waitForNewAttacherCallCount(t, 2 /* expectedCallCount */, fakePlugin)
|
||||
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
|
||||
waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
|
||||
}
|
||||
|
||||
func waitForNewAttacherCallCount(
|
||||
t *testing.T,
|
||||
expectedCallCount int,
|
||||
@ -404,6 +583,40 @@ func waitForAttachCallCount(
|
||||
}
|
||||
}
|
||||
|
||||
func waitForTotalAttachCallCount(
|
||||
t *testing.T,
|
||||
expectedAttachCallCount int,
|
||||
fakePlugin *volumetesting.FakeVolumePlugin) {
|
||||
if len(fakePlugin.GetAttachers()) == 0 && expectedAttachCallCount == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
err := retryWithExponentialBackOff(
|
||||
time.Duration(5*time.Millisecond),
|
||||
func() (bool, error) {
|
||||
totalCount := 0
|
||||
for _, attacher := range fakePlugin.GetAttachers() {
|
||||
totalCount += attacher.GetAttachCallCount()
|
||||
}
|
||||
if totalCount == expectedAttachCallCount {
|
||||
return true, nil
|
||||
}
|
||||
t.Logf(
|
||||
"Warning: Wrong total GetAttachCallCount(). Expected: <%v> Actual: <%v>. Will retry.",
|
||||
expectedAttachCallCount,
|
||||
totalCount)
|
||||
|
||||
return false, nil
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf(
|
||||
"Total AttachCallCount does not match expected value. Expected: <%v>",
|
||||
expectedAttachCallCount)
|
||||
}
|
||||
}
|
||||
|
||||
func waitForDetachCallCount(
|
||||
t *testing.T,
|
||||
expectedDetachCallCount int,
|
||||
@ -441,6 +654,40 @@ func waitForDetachCallCount(
|
||||
}
|
||||
}
|
||||
|
||||
func waitForTotalDetachCallCount(
|
||||
t *testing.T,
|
||||
expectedDetachCallCount int,
|
||||
fakePlugin *volumetesting.FakeVolumePlugin) {
|
||||
if len(fakePlugin.GetDetachers()) == 0 && expectedDetachCallCount == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
err := retryWithExponentialBackOff(
|
||||
time.Duration(5*time.Millisecond),
|
||||
func() (bool, error) {
|
||||
totalCount := 0
|
||||
for _, detacher := range fakePlugin.GetDetachers() {
|
||||
totalCount += detacher.GetDetachCallCount()
|
||||
}
|
||||
if totalCount == expectedDetachCallCount {
|
||||
return true, nil
|
||||
}
|
||||
t.Logf(
|
||||
"Warning: Wrong total GetDetachCallCount(). Expected: <%v> Actual: <%v>. Will retry.",
|
||||
expectedDetachCallCount,
|
||||
totalCount)
|
||||
|
||||
return false, nil
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf(
|
||||
"Total DetachCallCount does not match expected value. Expected: <%v>",
|
||||
expectedDetachCallCount)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyNewAttacherCallCount(
|
||||
t *testing.T,
|
||||
expectZeroNewAttacherCallCount bool,
|
||||
|
@ -48,6 +48,13 @@ func GetTestVolumeSpec(volumeName string, diskName v1.UniqueVolumeName) *volume.
|
||||
},
|
||||
},
|
||||
},
|
||||
PersistentVolume: &v1.PersistentVolume{
|
||||
Spec: v1.PersistentVolumeSpec{
|
||||
AccessModes: []v1.PersistentVolumeAccessMode{
|
||||
v1.ReadWriteOnce,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user