Merge pull request #27837 from saad-ali/blockKubeletDetachFix

Automatic merge from submit-queue

Prevent detach before node status update

The PR prevents the attach/detach controller from start a detach operation before updating the node status (to remove the volume from the list of attached volumes).

Fixes https://github.com/kubernetes/kubernetes/issues/27836
This commit is contained in:
k8s-merge-robot 2016-06-22 10:10:58 -07:00 committed by GitHub
commit 5289de0501
3 changed files with 123 additions and 16 deletions

View File

@ -92,6 +92,21 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) {
// Volume exists in actual state of world but not desired
// Mark desire to detach
timeElapsed, err := rc.actualStateOfWorld.MarkDesireToDetach(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil {
glog.Errorf("Unexpected error actualStateOfWorld.MarkDesireToDetach(): %v", err)
}
// Update Node Status to indicate volume is no longer safe to mount.
err = rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
// Skip detaching this volume if unable to update node status
glog.Infof("UpdateNodeStatuses failed with: %v", err)
continue
}
if !attachedVolume.MountedByNode {
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q", attachedVolume.VolumeName, attachedVolume.NodeName)
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, rc.actualStateOfWorld)
@ -112,10 +127,6 @@ func (rc *reconciler) reconciliationLoopFunc() func() {
}
} else {
// If volume is not safe to detach (is mounted) wait a max amount of time before detaching any way.
timeElapsed, err := rc.actualStateOfWorld.MarkDesireToDetach(attachedVolume.VolumeName, attachedVolume.NodeName)
if err != nil {
glog.Errorf("Unexpected error actualStateOfWorld.MarkDesireToDetach(): %v", err)
}
if timeElapsed > rc.maxWaitForUnmountDuration {
glog.V(5).Infof("Attempting to start DetachVolume for volume %q from node %q. Volume is not safe to detach, but maxWaitForUnmountDuration expired.", attachedVolume.VolumeName, attachedVolume.NodeName)
err := rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, rc.actualStateOfWorld)

View File

@ -75,10 +75,7 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr)
nodeInformer := informers.CreateSharedNodeIndexInformer(
fakeKubeClient, resyncPeriod)
nsu := statusupdater.NewNodeStatusUpdater(
fakeKubeClient, nodeInformer, asw)
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
podName := "pod-uid"
@ -121,10 +118,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr)
nodeInformer := informers.CreateSharedNodeIndexInformer(
fakeKubeClient, resyncPeriod)
nsu := statusupdater.NewNodeStatusUpdater(
fakeKubeClient, nodeInformer, asw)
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
podName := "pod-uid"
@ -188,10 +182,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr)
nodeInformer := informers.CreateSharedNodeIndexInformer(
fakeKubeClient, resyncPeriod)
nsu := statusupdater.NewNodeStatusUpdater(
fakeKubeClient, nodeInformer, asw)
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
podName := "pod-uid"
@ -241,6 +232,72 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
waitForDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
}
// Populates desiredStateOfWorld cache with one node/volume/pod tuple.
// Has node update fail
// Calls Run()
// Verifies there is one attach call and no detach calls.
// Marks the node/volume as unmounted.
// Deletes the node/volume/pod tuple from desiredStateOfWorld cache.
// Verifies there are NO detach call and no (new) attach calls.
func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdateStatusFail(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr)
nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, dsw, asw, ad, nsu)
podName := "pod-uid"
volumeName := api.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := "node-name"
dsw.AddNode(nodeName)
volumeExists := dsw.VolumeExists(volumeName, nodeName)
if volumeExists {
t.Fatalf(
"Volume %q/node %q should not exist, but it does.",
volumeName,
nodeName)
}
generatedVolumeName, podAddErr := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName)
if podAddErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podAddErr)
}
// Act
go reconciler.Run(wait.NeverStop)
// Assert
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
// Act
dsw.DeletePod(types.UniquePodName(podName), generatedVolumeName, nodeName)
volumeExists = dsw.VolumeExists(generatedVolumeName, nodeName)
if volumeExists {
t.Fatalf(
"Deleted pod %q from volume %q/node %q. Volume should also be deleted but it still exists.",
podName,
generatedVolumeName,
nodeName)
}
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
// Assert
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
}
func waitForNewAttacherCallCount(
t *testing.T,
expectedCallCount int,

View File

@ -0,0 +1,39 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statusupdater
import (
"fmt"
)
func NewFakeNodeStatusUpdater(returnError bool) NodeStatusUpdater {
return &fakeNodeStatusUpdater{
returnError: returnError,
}
}
type fakeNodeStatusUpdater struct {
returnError bool
}
func (fnsu *fakeNodeStatusUpdater) UpdateNodeStatuses() error {
if fnsu.returnError {
return fmt.Errorf("fake error on update node status")
}
return nil
}