From 06baeb33b27ae0f4d9fb38cc5f26bf5fb0468f72 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Wed, 25 Jan 2017 18:40:41 +0100 Subject: [PATCH] Don't try to attach volumes which are already attached to other nodes --- .../cache/actual_state_of_world.go | 19 ++ .../volume/attachdetach/reconciler/BUILD | 3 +- .../attachdetach/reconciler/reconciler.go | 74 +++++- .../reconciler/reconciler_test.go | 249 +++++++++++++++++- .../attachdetach/testing/testvolumespec.go | 7 + 5 files changed, 341 insertions(+), 11 deletions(-) diff --git a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go index c1c4ad5d261..bece129d395 100644 --- a/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go +++ b/pkg/controller/volume/attachdetach/cache/actual_state_of_world.go @@ -111,6 +111,9 @@ type ActualStateOfWorld interface { GetAttachedVolumesPerNode() map[types.NodeName][]operationexecutor.AttachedVolume + // GetNodesForVolume returns the nodes on which the volume is attached + GetNodesForVolume(volumeName v1.UniqueVolumeName) []types.NodeName + // GetVolumesToReportAttached returns a map containing the set of nodes for // which the VolumesAttached Status field in the Node API object should be // updated. The key in this map is the name of the node to update and the @@ -581,6 +584,22 @@ func (asw *actualStateOfWorld) GetAttachedVolumesPerNode() map[types.NodeName][] return attachedVolumesPerNode } +func (asw *actualStateOfWorld) GetNodesForVolume(volumeName v1.UniqueVolumeName) []types.NodeName { + asw.RLock() + defer asw.RUnlock() + + volumeObj, volumeExists := asw.attachedVolumes[volumeName] + if !volumeExists || len(volumeObj.nodesAttachedTo) == 0 { + return []types.NodeName{} + } + + nodes := []types.NodeName{} + for k := range volumeObj.nodesAttachedTo { + nodes = append(nodes, k) + } + return nodes +} + func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[types.NodeName][]v1.AttachedVolume { asw.RLock() defer asw.RUnlock() diff --git a/pkg/controller/volume/attachdetach/reconciler/BUILD b/pkg/controller/volume/attachdetach/reconciler/BUILD index a8d06db06a9..e537530f305 100644 --- a/pkg/controller/volume/attachdetach/reconciler/BUILD +++ b/pkg/controller/volume/attachdetach/reconciler/BUILD @@ -13,10 +13,11 @@ go_library( srcs = ["reconciler.go"], tags = ["automanaged"], deps = [ + "//pkg/api/v1:go_default_library", "//pkg/controller/volume/attachdetach/cache:go_default_library", "//pkg/controller/volume/attachdetach/statusupdater:go_default_library", "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", - "//pkg/volume/util/nestedpendingoperations:go_default_library", + "//pkg/volume:go_default_library", "//pkg/volume/util/operationexecutor:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index 9cbbfe586d6..ae0a895387f 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -25,10 +25,11 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" - "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations" + "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" ) @@ -125,6 +126,41 @@ func (rc *reconciler) syncStates() { rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld) } +// isMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible. +// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns +// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the +// attacher to fail fast in such cases. +// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047 +func (rc *reconciler) isMultiAttachForbidden(volumeSpec *volume.Spec) bool { + if volumeSpec.Volume != nil { + // Check for volume types which are known to fail slow or cause trouble when trying to multi-attach + if volumeSpec.Volume.AzureDisk != nil || + volumeSpec.Volume.Cinder != nil { + return true + } + } + + // Only if this volume is a persistent volume, we have reliable information on wether it's allowed or not to + // multi-attach. We trust in the individual volume implementations to not allow unsupported access modes + if volumeSpec.PersistentVolume != nil { + if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 { + // No access mode specified so we don't know for sure. Let the attacher fail if needed + return false + } + + // check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false + for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes { + if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany { + return false + } + } + return true + } + + // we don't know if it's supported or not and let the attacher fail later in cases it's not supported + return false +} + func (rc *reconciler) reconcile() { // Detaches are triggered before attaches so that volumes referenced by // pods that are rescheduled to a different node are detached first. @@ -133,6 +169,16 @@ func (rc *reconciler) reconcile() { for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() { if !rc.desiredStateOfWorld.VolumeExists( attachedVolume.VolumeName, attachedVolume.NodeName) { + + // Don't even try to start an operation if there is already one running + // This check must be done before we do any other checks, as otherwise the other checks + // may pass while at the same time the volume leaves the pending state, resulting in + // double detach attempts + if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "") { + glog.V(10).Infof("Operation for volume %q is already running. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName) + continue + } + // Set the detach request time elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName) if err != nil { @@ -177,10 +223,8 @@ func (rc *reconciler) reconcile() { glog.Infof(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", fmt.Sprintf("This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", rc.maxWaitForUnmountDuration))) } } - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. + if err != nil && !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf(attachedVolume.GenerateErrorDetailed("attacherDetacher.DetachVolume failed to start", err).Error()) } @@ -195,16 +239,28 @@ func (rc *reconciler) reconcile() { glog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", "")) rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName) } else { + // Don't even try to start an operation if there is already one running + if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") { + glog.V(10).Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) + continue + } + + if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) { + nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName) + if len(nodes) > 0 { + glog.V(4).Infof("Volume %q is already exclusively attached to node %q and can't be attached to %q", volumeToAttach.VolumeName, nodes, volumeToAttach.NodeName) + continue + } + } + // Volume/Node doesn't exist, spawn a goroutine to attach it glog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting attacherDetacher.AttachVolume", "")) err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld) if err == nil { glog.Infof(volumeToAttach.GenerateMsgDetailed("attacherDetacher.AttachVolume started", "")) } - if err != nil && - !nestedpendingoperations.IsAlreadyExists(err) && - !exponentialbackoff.IsExponentialBackoff(err) { - // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. + if err != nil && !exponentialbackoff.IsExponentialBackoff(err) { + // Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. glog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error()) } diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go index 7106caa0917..c1118898376 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go @@ -283,7 +283,9 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate } // Act - go reconciler.Run(wait.NeverStop) + ch := make(chan struct{}) + go reconciler.Run(ch) + defer close(ch) // Assert waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin) @@ -313,6 +315,183 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin) } +// Creates a volume with accessMode ReadWriteMany +// Populates desiredStateOfWorld cache with two ode/volume/pod tuples pointing to the created volume +// Calls Run() +// Verifies there are two attach calls and no detach calls. +// Deletes the first node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted. +// Verifies there is one detach call and no (new) attach calls. +// Deletes the second node/volume/pod tuple from desiredStateOfWorld cache without first marking the node/volume as unmounted. +// Verifies there are two detach calls and no (new) attach calls. +func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteMany(t *testing.T) { + // Arrange + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(volumePluginMgr) + fakeKubeClient := controllervolumetesting.CreateTestClient() + fakeRecorder := &record.FakeRecorder{} + ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */)) + nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) + reconciler := NewReconciler( + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu) + podName1 := "pod-uid1" + podName2 := "pod-uid2" + volumeName := v1.UniqueVolumeName("volume-name") + volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) + volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteMany} + nodeName1 := k8stypes.NodeName("node-name1") + nodeName2 := k8stypes.NodeName("node-name2") + dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/) + dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/) + + generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1) + if podAddErr != nil { + t.Fatalf("AddPod failed. Expected: Actual: <%v>", podAddErr) + } + _, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2) + if podAddErr != nil { + t.Fatalf("AddPod failed. Expected: Actual: <%v>", podAddErr) + } + + // Act + ch := make(chan struct{}) + go reconciler.Run(ch) + defer close(ch) + + // Assert + waitForNewAttacherCallCount(t, 2 /* expectedCallCount */, fakePlugin) + verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin) + waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin) + verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin) + waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin) + + nodesForVolume := asw.GetNodesForVolume(generatedVolumeName) + if len(nodesForVolume) != 2 { + t.Fatal("Volume was not attached to both nodes") + } + + // Act + dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1) + volumeExists := dsw.VolumeExists(generatedVolumeName, nodeName1) + if volumeExists { + t.Fatalf( + "Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.", + podName1, + generatedVolumeName, + nodeName1) + } + + // Assert -- Timer will triger detach + waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin) + verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin) + waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin) + verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin) + waitForTotalDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin) + + // Act + dsw.DeletePod(types.UniquePodName(podName2), generatedVolumeName, nodeName2) + volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName2) + if volumeExists { + t.Fatalf( + "Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.", + podName2, + generatedVolumeName, + nodeName2) + } + + // Assert -- Timer will triger detach + waitForNewDetacherCallCount(t, 2 /* expectedCallCount */, fakePlugin) + verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin) + waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin) + verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin) + waitForTotalDetachCallCount(t, 2 /* expectedDetachCallCount */, fakePlugin) +} + +// Creates a volume with accessMode ReadWriteOnce +// Populates desiredStateOfWorld cache with two ode/volume/pod tuples pointing to the created volume +// Calls Run() +// Verifies there is one attach call and no detach calls. +// Deletes the node/volume/pod tuple from desiredStateOfWorld which succeeded in attaching +// Verifies there are two attach call and one detach call. +func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.T) { + // Arrange + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(volumePluginMgr) + fakeKubeClient := controllervolumetesting.CreateTestClient() + fakeRecorder := &record.FakeRecorder{} + ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */)) + nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) + reconciler := NewReconciler( + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu) + podName1 := "pod-uid1" + podName2 := "pod-uid2" + volumeName := v1.UniqueVolumeName("volume-name") + volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) + volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce} + nodeName1 := k8stypes.NodeName("node-name1") + nodeName2 := k8stypes.NodeName("node-name2") + dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/) + dsw.AddNode(nodeName2, false /*keepTerminatedPodVolumes*/) + + // Add both pods at the same time to provoke a potential race condition in the reconciler + generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, podName1), volumeSpec, nodeName1) + if podAddErr != nil { + t.Fatalf("AddPod failed. Expected: Actual: <%v>", podAddErr) + } + _, podAddErr = dsw.AddPod(types.UniquePodName(podName2), controllervolumetesting.NewPod(podName2, podName2), volumeSpec, nodeName2) + if podAddErr != nil { + t.Fatalf("AddPod failed. Expected: Actual: <%v>", podAddErr) + } + + // Act + ch := make(chan struct{}) + go reconciler.Run(ch) + defer close(ch) + + // Assert + waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin) + verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin) + waitForTotalAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin) + verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin) + waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin) + + nodesForVolume := asw.GetNodesForVolume(generatedVolumeName) + if len(nodesForVolume) == 0 { + t.Fatal("Volume was not attached to any node") + } else if len(nodesForVolume) != 1 { + t.Fatal("Volume was attached to multiple nodes") + } + + // Act + podToDelete := "" + if nodesForVolume[0] == nodeName1 { + podToDelete = podName1 + } else if nodesForVolume[0] == nodeName2 { + podToDelete = podName2 + } else { + t.Fatal("Volume attached to unexpected node") + } + + dsw.DeletePod(types.UniquePodName(podToDelete), generatedVolumeName, nodesForVolume[0]) + volumeExists := dsw.VolumeExists(generatedVolumeName, nodesForVolume[0]) + if volumeExists { + t.Fatalf( + "Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.", + podToDelete, + generatedVolumeName, + nodesForVolume[0]) + } + + // Assert + waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin) + verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin) + waitForTotalDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin) + waitForNewAttacherCallCount(t, 2 /* expectedCallCount */, fakePlugin) + verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin) + waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin) +} + func waitForNewAttacherCallCount( t *testing.T, expectedCallCount int, @@ -404,6 +583,40 @@ func waitForAttachCallCount( } } +func waitForTotalAttachCallCount( + t *testing.T, + expectedAttachCallCount int, + fakePlugin *volumetesting.FakeVolumePlugin) { + if len(fakePlugin.GetAttachers()) == 0 && expectedAttachCallCount == 0 { + return + } + + err := retryWithExponentialBackOff( + time.Duration(5*time.Millisecond), + func() (bool, error) { + totalCount := 0 + for _, attacher := range fakePlugin.GetAttachers() { + totalCount += attacher.GetAttachCallCount() + } + if totalCount == expectedAttachCallCount { + return true, nil + } + t.Logf( + "Warning: Wrong total GetAttachCallCount(). Expected: <%v> Actual: <%v>. Will retry.", + expectedAttachCallCount, + totalCount) + + return false, nil + }, + ) + + if err != nil { + t.Fatalf( + "Total AttachCallCount does not match expected value. Expected: <%v>", + expectedAttachCallCount) + } +} + func waitForDetachCallCount( t *testing.T, expectedDetachCallCount int, @@ -441,6 +654,40 @@ func waitForDetachCallCount( } } +func waitForTotalDetachCallCount( + t *testing.T, + expectedDetachCallCount int, + fakePlugin *volumetesting.FakeVolumePlugin) { + if len(fakePlugin.GetDetachers()) == 0 && expectedDetachCallCount == 0 { + return + } + + err := retryWithExponentialBackOff( + time.Duration(5*time.Millisecond), + func() (bool, error) { + totalCount := 0 + for _, detacher := range fakePlugin.GetDetachers() { + totalCount += detacher.GetDetachCallCount() + } + if totalCount == expectedDetachCallCount { + return true, nil + } + t.Logf( + "Warning: Wrong total GetDetachCallCount(). Expected: <%v> Actual: <%v>. Will retry.", + expectedDetachCallCount, + totalCount) + + return false, nil + }, + ) + + if err != nil { + t.Fatalf( + "Total DetachCallCount does not match expected value. Expected: <%v>", + expectedDetachCallCount) + } +} + func verifyNewAttacherCallCount( t *testing.T, expectZeroNewAttacherCallCount bool, diff --git a/pkg/controller/volume/attachdetach/testing/testvolumespec.go b/pkg/controller/volume/attachdetach/testing/testvolumespec.go index 2b954e6b79a..7097632dc1a 100644 --- a/pkg/controller/volume/attachdetach/testing/testvolumespec.go +++ b/pkg/controller/volume/attachdetach/testing/testvolumespec.go @@ -48,6 +48,13 @@ func GetTestVolumeSpec(volumeName string, diskName v1.UniqueVolumeName) *volume. }, }, }, + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + }, + }, + }, } }