Don't force detach volume from healthy nodes

6 minute force-deatch timeout should be used only for nodes that are not
healthy. 

In case a CSI driver is being upgraded or it's simply slow, NodeUnstage
can take more than 6 minutes. In that case, Pod is already deleted from the
API server and thus A/D controller will force-detach a mounted volume,
possibly corrupting the volume and breaking CSI - a CSI driver expects
NodeUnstage to succeed before Kubernetes can call ControllerUnpublish.
This commit is contained in:
Jan Safranek 2022-06-22 14:27:50 +02:00
parent 18b5efceda
commit 3b94ac228a
2 changed files with 117 additions and 2 deletions

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/features"
kevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/taints"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
@ -154,6 +155,15 @@ func (rc *reconciler) hasOutOfServiceTaint(nodeName types.NodeName) (bool, error
return false, nil
}
// nodeIsHealthy returns true if the node looks healthy.
func (rc *reconciler) nodeIsHealthy(nodeName types.NodeName) (bool, error) {
node, err := rc.nodeLister.Get(string(nodeName))
if err != nil {
return false, err
}
return nodeutil.IsNodeReady(node), nil
}
func (rc *reconciler) reconcile() {
// Detaches are triggered before attaches so that volumes referenced by
// pods that are rescheduled to a different node are detached first.
@ -204,14 +214,22 @@ func (rc *reconciler) reconcile() {
// Check whether timeout has reached the maximum waiting time
timeout := elapsedTime > rc.maxWaitForUnmountDuration
isHealthy, err := rc.nodeIsHealthy(attachedVolume.NodeName)
if err != nil {
klog.Errorf("failed to get health of node %s: %s", attachedVolume.NodeName, err.Error())
}
// Force detach volumes from unhealthy nodes after maxWaitForUnmountDuration.
forceDetach := !isHealthy && timeout
hasOutOfServiceTaint, err := rc.hasOutOfServiceTaint(attachedVolume.NodeName)
if err != nil {
klog.Errorf("failed to get taint specs for node %s: %s", attachedVolume.NodeName, err.Error())
}
// Check whether volume is still mounted. Skip detach if it is still mounted unless timeout
// Check whether volume is still mounted. Skip detach if it is still mounted unless force detach timeout
// or the node has `node.kubernetes.io/out-of-service` taint.
if attachedVolume.MountedByNode && !timeout && !hasOutOfServiceTaint {
if attachedVolume.MountedByNode && !forceDetach && !hasOutOfServiceTaint {
klog.V(5).InfoS("Cannot detach volume because it is still mounted", "volume", attachedVolume)
continue
}

View File

@ -235,6 +235,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
nodeName := k8stypes.NodeName("node-name")
dsw.AddNode(nodeName, false /*keepTerminatedPodVolumes*/)
volumeExists := dsw.VolumeExists(volumeName, nodeName)
if volumeExists {
t.Fatalf(
@ -936,6 +937,102 @@ func Test_Run_OneVolumeDetachOnNoOutOfServiceTaintedNode(t *testing.T) {
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
}
// Populates desiredStateOfWorld cache with one node/volume/pod tuple.
// The node starts as healthy.
//
// Calls Run()
// Verifies there is one attach call and no detach calls.
// Deletes the pod from desiredStateOfWorld cache without first marking the node/volume as unmounted.
// Verifies that the volume is NOT detached after maxWaitForUnmountDuration.
// Marks the node as unhealthy.
// Verifies that the volume is detached after maxWaitForUnmountDuration.
func Test_Run_OneVolumeDetachOnUnhealthyNode(t *testing.T) {
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
fakeHandler))
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
nodeLister := informerFactory.Core().V1().Nodes().Lister()
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad,
nsu, nodeLister, fakeRecorder)
podName1 := "pod-uid1"
volumeName1 := v1.UniqueVolumeName("volume-name1")
volumeSpec1 := controllervolumetesting.GetTestVolumeSpec(string(volumeName1), volumeName1)
nodeName1 := k8stypes.NodeName("worker-0")
node1 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: string(nodeName1)},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
}
informerFactory.Core().V1().Nodes().Informer().GetStore().Add(node1)
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
volumeExists := dsw.VolumeExists(volumeName1, nodeName1)
if volumeExists {
t.Fatalf(
"Volume %q/node %q should not exist, but it does.",
volumeName1,
nodeName1)
}
generatedVolumeName, podErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1,
podName1), volumeSpec1, nodeName1)
if podErr != nil {
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podErr)
}
// Act
ch := make(chan struct{})
go reconciler.Run(ch)
defer close(ch)
// Assert
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
// Act
// Delete the pod and the volume will be detached even after the maxWaitForUnmountDuration expires as volume is
// not unmounted and the node is healthy.
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
time.Sleep(maxWaitForUnmountDuration * 5)
// Assert
waitForNewDetacherCallCount(t, 0 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
// Act
// Mark the node unhealthy
node2 := node1.DeepCopy()
node2.Status.Conditions[0].Status = v1.ConditionFalse
informerFactory.Core().V1().Nodes().Informer().GetStore().Update(node2)
// Assert -- Detach was triggered after maxWaitForUnmountDuration
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
waitForDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
}
func Test_ReportMultiAttachError(t *testing.T) {
type nodeWithPods struct {
name k8stypes.NodeName