From 773ac20880b10cb4bb3eeb35e7552310805dd8e4 Mon Sep 17 00:00:00 2001 From: saadali Date: Tue, 21 Jun 2016 21:47:52 -0700 Subject: [PATCH] Prevent detach before node status update --- .../volume/reconciler/reconciler.go | 19 ++++- .../volume/reconciler/reconciler_test.go | 81 ++++++++++++++++--- .../statusupdater/fake_node_status_updater.go | 39 +++++++++ 3 files changed, 123 insertions(+), 16 deletions(-) create mode 100644 pkg/controller/volume/statusupdater/fake_node_status_updater.go diff --git a/pkg/controller/volume/reconciler/reconciler.go b/pkg/controller/volume/reconciler/reconciler.go index efb9e9ad08f..b948ee259b4 100644 --- a/pkg/controller/volume/reconciler/reconciler.go +++ b/pkg/controller/volume/reconciler/reconciler.go @@ -92,6 +92,21 @@ func (rc *reconciler) reconciliationLoopFunc() func() { if !rc.desiredStateOfWorld.VolumeExists( attachedVolume.VolumeName, attachedVolume.NodeName) { // Volume exists in actual state of world but not desired + + // Mark desire to detach + timeElapsed, err := rc.actualStateOfWorld.MarkDesireToDetach(attachedVolume.VolumeName, attachedVolume.NodeName) + if err != nil { + glog.Errorf("Unexpected error actualStateOfWorld.MarkDesireToDetach(): %v", err) + } + + // Update Node Status to indicate volume is no longer safe to mount. + err = rc.nodeStatusUpdater.UpdateNodeStatuses() + if err != nil { + // Skip detaching this volume if unable to update node status + glog.Infof("UpdateNodeStatuses failed with: %v", err) + continue + } + if !attachedVolume.MountedByNode { glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName) err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, rc.actualStateOfWorld) @@ -112,10 +127,6 @@ func (rc *reconciler) reconciliationLoopFunc() func() { } } else { // If volume is not safe to detach (is mounted) wait a max amount of time before detaching any way. - timeElapsed, err := rc.actualStateOfWorld.MarkDesireToDetach(attachedVolume.VolumeName, attachedVolume.NodeName) - if err != nil { - glog.Errorf("Unexpected error actualStateOfWorld.MarkDesireToDetach(): %v", err) - } if timeElapsed > rc.maxWaitForUnmountDuration { glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q. Volume is not safe to detach, but maxWaitForUnmountDuration expired.", attachedVolume.VolumeName, attachedVolume.NodeName) err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, rc.actualStateOfWorld) diff --git a/pkg/controller/volume/reconciler/reconciler_test.go b/pkg/controller/volume/reconciler/reconciler_test.go index e79d63f7828..3563cebdf0c 100644 --- a/pkg/controller/volume/reconciler/reconciler_test.go +++ b/pkg/controller/volume/reconciler/reconciler_test.go @@ -75,10 +75,7 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) { asw := cache.NewActualStateOfWorld(volumePluginMgr) fakeKubeClient := controllervolumetesting.CreateTestClient() ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr) - nodeInformer := informers.CreateSharedNodeIndexInformer( - fakeKubeClient, resyncPeriod) - nsu := statusupdater.NewNodeStatusUpdater( - fakeKubeClient, nodeInformer, asw) + nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) podName := "pod-uid" @@ -121,10 +118,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te asw := cache.NewActualStateOfWorld(volumePluginMgr) fakeKubeClient := controllervolumetesting.CreateTestClient() ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr) - nodeInformer := informers.CreateSharedNodeIndexInformer( - fakeKubeClient, resyncPeriod) - nsu := statusupdater.NewNodeStatusUpdater( - fakeKubeClient, nodeInformer, asw) + nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) podName := "pod-uid" @@ -188,10 +182,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test asw := cache.NewActualStateOfWorld(volumePluginMgr) fakeKubeClient := controllervolumetesting.CreateTestClient() ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr) - nodeInformer := informers.CreateSharedNodeIndexInformer( - fakeKubeClient, resyncPeriod) - nsu := statusupdater.NewNodeStatusUpdater( - fakeKubeClient, nodeInformer, asw) + nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) podName := "pod-uid" @@ -241,6 +232,72 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test waitForDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin) } +// Populates desiredStateOfWorld cache with one node/volume/pod tuple. +// Has node update fail +// Calls Run() +// Verifies there is one attach call and no detach calls. +// Marks the node/volume as unmounted. +// Deletes the node/volume/pod tuple from desiredStateOfWorld cache. +// Verifies there are NO detach call and no (new) attach calls. +func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdateStatusFail(t *testing.T) { + // Arrange + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(volumePluginMgr) + fakeKubeClient := controllervolumetesting.CreateTestClient() + ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr) + nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */) + reconciler := NewReconciler( + reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu) + podName := "pod-uid" + volumeName := api.UniqueVolumeName("volume-name") + volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) + nodeName := "node-name" + dsw.AddNode(nodeName) + volumeExists := dsw.VolumeExists(volumeName, nodeName) + if volumeExists { + t.Fatalf( + "Volume %q/node %q should not exist, but it does.", + volumeName, + nodeName) + } + + generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName) + if podAddErr != nil { + t.Fatalf("AddPod failed. Expected: Actual: <%v>", podAddErr) + } + + // Act + go reconciler.Run(wait.NeverStop) + + // Assert + waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin) + verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin) + waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin) + verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin) + waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin) + + // Act + dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName) + volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName) + if volumeExists { + t.Fatalf( + "Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.", + podName, + generatedVolumeName, + nodeName) + } + asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */) + asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */) + + // Assert + verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin) + verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin) + waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin) + verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin) + waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin) +} + func waitForNewAttacherCallCount( t *testing.T, expectedCallCount int, diff --git a/pkg/controller/volume/statusupdater/fake_node_status_updater.go b/pkg/controller/volume/statusupdater/fake_node_status_updater.go new file mode 100644 index 00000000000..dc734e2ff0d --- /dev/null +++ b/pkg/controller/volume/statusupdater/fake_node_status_updater.go @@ -0,0 +1,39 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statusupdater + +import ( + "fmt" +) + +func NewFakeNodeStatusUpdater(returnError bool) NodeStatusUpdater { + return &fakeNodeStatusUpdater{ + returnError: returnError, + } +} + +type fakeNodeStatusUpdater struct { + returnError bool +} + +func (fnsu *fakeNodeStatusUpdater) UpdateNodeStatuses() error { + if fnsu.returnError { + return fmt.Errorf("fake error on update node status") + } + + return nil +}