Don't try to attach volumes which are already attached to other nodes

This commit is contained in:
Alexander Block 2017-01-25 18:40:41 +01:00
parent 7df0178076
commit 06baeb33b2
5 changed files with 341 additions and 11 deletions

View File

@ -111,6 +111,9 @@ type ActualStateOfWorld interface {
GetAttachedVolumesPerNode() map[types.NodeName][]operationexecutor.AttachedVolume
// GetNodesForVolume returns the nodes on which the volume is attached
GetNodesForVolume(volumeName v1.UniqueVolumeName) []types.NodeName
// GetVolumesToReportAttached returns a map containing the set of nodes for
// which the VolumesAttached Status field in the Node API object should be
// updated. The key in this map is the name of the node to update and the
@ -581,6 +584,22 @@ func (asw *actualStateOfWorld) GetAttachedVolumesPerNode() map[types.NodeName][]
return attachedVolumesPerNode
}
func (asw *actualStateOfWorld) GetNodesForVolume(volumeName v1.UniqueVolumeName) []types.NodeName {
asw.RLock()
defer asw.RUnlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists || len(volumeObj.nodesAttachedTo) == 0 {
return []types.NodeName{}
}
nodes := []types.NodeName{}
for k := range volumeObj.nodesAttachedTo {
nodes = append(nodes, k)
}
return nodes
}
func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][]v1.AttachedVolume {
asw.RLock()
defer asw.RUnlock()

View File

@ -13,10 +13,11 @@ go_library(
srcs = ["reconciler.go"],
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
"//pkg/volume/util/nestedpendingoperations:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -25,10 +25,11 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
@ -125,6 +126,41 @@ func (rc *reconciler) syncStates() {
rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld)
}
// isMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible.
// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns
// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the
// attacher to fail fast in such cases.
// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047
func (rc *reconciler) isMultiAttachForbidden(volumeSpec *volume.Spec) bool {
if volumeSpec.Volume != nil {
// Check for volume types which are known to fail slow or cause trouble when trying to multi-attach
if volumeSpec.Volume.AzureDisk != nil ||
volumeSpec.Volume.Cinder != nil {
return true
}
}
// Only if this volume is a persistent volume, we have reliable information on wether it's allowed or not to
// multi-attach. We trust in the individual volume implementations to not allow unsupported access modes
if volumeSpec.PersistentVolume != nil {
if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 {
// No access mode specified so we don't know for sure. Let the attacher fail if needed
return false
}
// check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false
for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes {
if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany {
return false
}
}
return true
}
// we don't know if it's supported or not and let the attacher fail later in cases it's not supported
return false
}
func (rc *reconciler) reconcile() {
// Detaches are triggered before attaches so that volumes referenced by
// pods that are rescheduled to a different node are detached first.
@ -133,6 +169,16 @@ func (rc *reconciler) reconcile() {
for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() {
if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) {
// Don't even try to start an operation if there is already one running
// This check must be done before we do any other checks, as otherwise the other checks
// may pass while at the same time the volume leaves the pending state, resulting in
// double detach attempts
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "") {
glog.V(10).Infof("Operation for volume %q is already running. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
continue
}
// Set the detach request time
elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil {
@ -177,10 +223,8 @@ func (rc *reconciler) reconcile() {
glog.Infof(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", fmt.Sprintf("This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", rc.maxWaitForUnmountDuration)))
}
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(attachedVolume.GenerateErrorDetailed("attacherDetacher.DetachVolume failed to start", err).Error())
}
@ -195,16 +239,28 @@ func (rc *reconciler) reconcile() {
glog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", ""))
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
} else {
// Don't even try to start an operation if there is already one running
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") {
glog.V(10).Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
continue
}
if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) {
nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName)
if len(nodes) > 0 {
glog.V(4).Infof("Volume %q is already exclusively attached to node %q and can't be attached to %q", volumeToAttach.VolumeName, nodes, volumeToAttach.NodeName)
continue
}
}
// Volume/Node doesn't exist, spawn a goroutine to attach it
glog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting attacherDetacher.AttachVolume", ""))
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
if err == nil {
glog.Infof(volumeToAttach.GenerateMsgDetailed("attacherDetacher.AttachVolume started", ""))
}
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected.
if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
glog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error())
}

View File

@ -283,7 +283,9 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
}
// Act
go reconciler.Run(wait.NeverStop)
ch := make(chan struct{})
go reconciler.Run(ch)
defer close(ch)
// Assert
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
@ -313,6 +315,183 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
}
// Creates a volume with accessMode ReadWriteMany
// Populates desiredStateOfWorld cache with two ode/volume/pod tuples pointing to the created volume
// Calls Run()
// Verifies there are two attach calls and no detach calls.
// Deletes the first node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted.
// Verifies there is one detach call and no (new) attach calls.
// Deletes the second node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted.
// Verifies there are two detach calls and no (new) attach calls.
func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteMany(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu)
podName1 := "pod-uid1"
podName2 := "pod-uid2"
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteMany}
nodeName1 := k8stypes.NodeName("node-name1")
nodeName2 := k8stypes.NodeName("node-name2")
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/)
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
_, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
// Act
ch := make(chan struct{})
go reconciler.Run(ch)
defer close(ch)
// Assert
waitForNewAttacherCallCount(t, 2 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
nodesForVolume := asw.GetNodesForVolume(generatedVolumeName)
if len(nodesForVolume) != 2 {
t.Fatal("Volume was not attached to both nodes")
}
// Act
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
volumeExists := dsw.VolumeExists(generatedVolumeName, nodeName1)
if volumeExists {
t.Fatalf(
"Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
podName1,
generatedVolumeName,
nodeName1)
}
// Assert -- Timer will triger detach
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForTotalDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
// Act
dsw.DeletePod(types.UniquePodName(podName2), generatedVolumeName, nodeName2)
volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName2)
if volumeExists {
t.Fatalf(
"Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
podName2,
generatedVolumeName,
nodeName2)
}
// Assert -- Timer will triger detach
waitForNewDetacherCallCount(t, 2 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForTotalDetachCallCount(t, 2 /* expectedDetachCallCount */, fakePlugin)
}
// Creates a volume with accessMode ReadWriteOnce
// Populates desiredStateOfWorld cache with two ode/volume/pod tuples pointing to the created volume
// Calls Run()
// Verifies there is one attach call and no detach calls.
// Deletes the node/volume/pod tuple from desiredStateOfWorld which succeeded in attaching
// Verifies there are two attach call and one detach call.
func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu)
podName1 := "pod-uid1"
podName2 := "pod-uid2"
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}
nodeName1 := k8stypes.NodeName("node-name1")
nodeName2 := k8stypes.NodeName("node-name2")
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/)
// Add both pods at the same time to provoke a potential race condition in the reconciler
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
_, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
// Act
ch := make(chan struct{})
go reconciler.Run(ch)
defer close(ch)
// Assert
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForTotalAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
nodesForVolume := asw.GetNodesForVolume(generatedVolumeName)
if len(nodesForVolume) == 0 {
t.Fatal("Volume was not attached to any node")
} else if len(nodesForVolume) != 1 {
t.Fatal("Volume was attached to multiple nodes")
}
// Act
podToDelete := ""
if nodesForVolume[0] == nodeName1 {
podToDelete = podName1
} else if nodesForVolume[0] == nodeName2 {
podToDelete = podName2
} else {
t.Fatal("Volume attached to unexpected node")
}
dsw.DeletePod(types.UniquePodName(podToDelete), generatedVolumeName, nodesForVolume[0])
volumeExists := dsw.VolumeExists(generatedVolumeName, nodesForVolume[0])
if volumeExists {
t.Fatalf(
"Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
podToDelete,
generatedVolumeName,
nodesForVolume[0])
}
// Assert
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForTotalDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
waitForNewAttacherCallCount(t, 2 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
}
func waitForNewAttacherCallCount(
t *testing.T,
expectedCallCount int,
@ -404,6 +583,40 @@ func waitForAttachCallCount(
}
}
func waitForTotalAttachCallCount(
t *testing.T,
expectedAttachCallCount int,
fakePlugin *volumetesting.FakeVolumePlugin) {
if len(fakePlugin.GetAttachers()) == 0 && expectedAttachCallCount == 0 {
return
}
err := retryWithExponentialBackOff(
time.Duration(5*time.Millisecond),
func() (bool, error) {
totalCount := 0
for _, attacher := range fakePlugin.GetAttachers() {
totalCount += attacher.GetAttachCallCount()
}
if totalCount == expectedAttachCallCount {
return true, nil
}
t.Logf(
"Warning: Wrong total GetAttachCallCount(). Expected: <%v> Actual: <%v>. Will retry.",
expectedAttachCallCount,
totalCount)
return false, nil
},
)
if err != nil {
t.Fatalf(
"Total AttachCallCount does not match expected value. Expected: <%v>",
expectedAttachCallCount)
}
}
func waitForDetachCallCount(
t *testing.T,
expectedDetachCallCount int,
@ -441,6 +654,40 @@ func waitForDetachCallCount(
}
}
func waitForTotalDetachCallCount(
t *testing.T,
expectedDetachCallCount int,
fakePlugin *volumetesting.FakeVolumePlugin) {
if len(fakePlugin.GetDetachers()) == 0 && expectedDetachCallCount == 0 {
return
}
err := retryWithExponentialBackOff(
time.Duration(5*time.Millisecond),
func() (bool, error) {
totalCount := 0
for _, detacher := range fakePlugin.GetDetachers() {
totalCount += detacher.GetDetachCallCount()
}
if totalCount == expectedDetachCallCount {
return true, nil
}
t.Logf(
"Warning: Wrong total GetDetachCallCount(). Expected: <%v> Actual: <%v>. Will retry.",
expectedDetachCallCount,
totalCount)
return false, nil
},
)
if err != nil {
t.Fatalf(
"Total DetachCallCount does not match expected value. Expected: <%v>",
expectedDetachCallCount)
}
}
func verifyNewAttacherCallCount(
t *testing.T,
expectZeroNewAttacherCallCount bool,

View File

@ -48,6 +48,13 @@ func GetTestVolumeSpec(volumeName string, diskName v1.UniqueVolumeName) *volume.
},
},
},
PersistentVolume: &v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
AccessModes: []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
},
},
},
}
}