mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 09:52:49 +00:00
Merge pull request #110721 from jsafrane/fix-force-detach
Don't force detach volume from healthy nodes
This commit is contained in:
commit
aefb71d7ef
@ -37,6 +37,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
kevents "k8s.io/kubernetes/pkg/kubelet/events"
|
||||
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||
"k8s.io/kubernetes/pkg/util/taints"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
@ -154,6 +155,15 @@ func (rc *reconciler) hasOutOfServiceTaint(nodeName types.NodeName) (bool, error
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// nodeIsHealthy returns true if the node looks healthy.
|
||||
func (rc *reconciler) nodeIsHealthy(nodeName types.NodeName) (bool, error) {
|
||||
node, err := rc.nodeLister.Get(string(nodeName))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return nodeutil.IsNodeReady(node), nil
|
||||
}
|
||||
|
||||
func (rc *reconciler) reconcile() {
|
||||
// Detaches are triggered before attaches so that volumes referenced by
|
||||
// pods that are rescheduled to a different node are detached first.
|
||||
@ -204,14 +214,22 @@ func (rc *reconciler) reconcile() {
|
||||
// Check whether timeout has reached the maximum waiting time
|
||||
timeout := elapsedTime > rc.maxWaitForUnmountDuration
|
||||
|
||||
isHealthy, err := rc.nodeIsHealthy(attachedVolume.NodeName)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get health of node %s: %s", attachedVolume.NodeName, err.Error())
|
||||
}
|
||||
|
||||
// Force detach volumes from unhealthy nodes after maxWaitForUnmountDuration.
|
||||
forceDetach := !isHealthy && timeout
|
||||
|
||||
hasOutOfServiceTaint, err := rc.hasOutOfServiceTaint(attachedVolume.NodeName)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get taint specs for node %s: %s", attachedVolume.NodeName, err.Error())
|
||||
}
|
||||
|
||||
// Check whether volume is still mounted. Skip detach if it is still mounted unless timeout
|
||||
// Check whether volume is still mounted. Skip detach if it is still mounted unless force detach timeout
|
||||
// or the node has `node.kubernetes.io/out-of-service` taint.
|
||||
if attachedVolume.MountedByNode && !timeout && !hasOutOfServiceTaint {
|
||||
if attachedVolume.MountedByNode && !forceDetach && !hasOutOfServiceTaint {
|
||||
klog.V(5).InfoS("Cannot detach volume because it is still mounted", "volume", attachedVolume)
|
||||
continue
|
||||
}
|
||||
|
@ -235,6 +235,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
|
||||
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
|
||||
nodeName := k8stypes.NodeName("node-name")
|
||||
dsw.AddNode(nodeName, false /*keepTerminatedPodVolumes*/)
|
||||
|
||||
volumeExists := dsw.VolumeExists(volumeName, nodeName)
|
||||
if volumeExists {
|
||||
t.Fatalf(
|
||||
@ -936,6 +937,102 @@ func Test_Run_OneVolumeDetachOnNoOutOfServiceTaintedNode(t *testing.T) {
|
||||
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
|
||||
}
|
||||
|
||||
// Populates desiredStateOfWorld cache with one node/volume/pod tuple.
|
||||
// The node starts as healthy.
|
||||
//
|
||||
// Calls Run()
|
||||
// Verifies there is one attach call and no detach calls.
|
||||
// Deletes the pod from desiredStateOfWorld cache without first marking the node/volume as unmounted.
|
||||
// Verifies that the volume is NOT detached after maxWaitForUnmountDuration.
|
||||
// Marks the node as unhealthy.
|
||||
// Verifies that the volume is detached after maxWaitForUnmountDuration.
|
||||
func Test_Run_OneVolumeDetachOnUnhealthyNode(t *testing.T) {
|
||||
// Arrange
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(volumePluginMgr)
|
||||
fakeKubeClient := controllervolumetesting.CreateTestClient()
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
||||
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
||||
fakeKubeClient,
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
fakeHandler))
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad,
|
||||
nsu, nodeLister, fakeRecorder)
|
||||
podName1 := "pod-uid1"
|
||||
volumeName1 := v1.UniqueVolumeName("volume-name1")
|
||||
volumeSpec1 := controllervolumetesting.GetTestVolumeSpec(string(volumeName1), volumeName1)
|
||||
nodeName1 := k8stypes.NodeName("worker-0")
|
||||
node1 := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: string(nodeName1)},
|
||||
Status: v1.NodeStatus{
|
||||
Conditions: []v1.NodeCondition{
|
||||
{
|
||||
Type: v1.NodeReady,
|
||||
Status: v1.ConditionTrue,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
informerFactory.Core().V1().Nodes().Informer().GetStore().Add(node1)
|
||||
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
|
||||
volumeExists := dsw.VolumeExists(volumeName1, nodeName1)
|
||||
if volumeExists {
|
||||
t.Fatalf(
|
||||
"Volume %q/node %q should not exist, but it does.",
|
||||
volumeName1,
|
||||
nodeName1)
|
||||
}
|
||||
|
||||
generatedVolumeName, podErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1,
|
||||
podName1), volumeSpec1, nodeName1)
|
||||
if podErr != nil {
|
||||
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podErr)
|
||||
}
|
||||
|
||||
// Act
|
||||
ch := make(chan struct{})
|
||||
go reconciler.Run(ch)
|
||||
defer close(ch)
|
||||
|
||||
// Assert
|
||||
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
|
||||
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
|
||||
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
|
||||
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
|
||||
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
|
||||
|
||||
// Act
|
||||
// Delete the pod and the volume will be detached even after the maxWaitForUnmountDuration expires as volume is
|
||||
// not unmounted and the node is healthy.
|
||||
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
|
||||
time.Sleep(maxWaitForUnmountDuration * 5)
|
||||
// Assert
|
||||
waitForNewDetacherCallCount(t, 0 /* expectedCallCount */, fakePlugin)
|
||||
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
|
||||
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
|
||||
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
|
||||
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
|
||||
|
||||
// Act
|
||||
// Mark the node unhealthy
|
||||
node2 := node1.DeepCopy()
|
||||
node2.Status.Conditions[0].Status = v1.ConditionFalse
|
||||
informerFactory.Core().V1().Nodes().Informer().GetStore().Update(node2)
|
||||
// Assert -- Detach was triggered after maxWaitForUnmountDuration
|
||||
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
|
||||
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
|
||||
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
|
||||
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
|
||||
waitForDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
|
||||
}
|
||||
|
||||
func Test_ReportMultiAttachError(t *testing.T) {
|
||||
type nodeWithPods struct {
|
||||
name k8stypes.NodeName
|
||||
|
Loading…
Reference in New Issue
Block a user