mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-21 09:57:52 +00:00
Handle Non-graceful Node Shutdown (#108486)
Signed-off-by: Ashutosh Kumar <sonasingh46@gmail.com> Co-authored-by: Ashutosh Kumar <sonasingh46@gmail.com> Co-authored-by: xing-yang <xingyang105@gmail.com>
This commit is contained in:
@@ -180,6 +180,7 @@ func NewAttachDetachController(
|
||||
adc.actualStateOfWorld,
|
||||
adc.attacherDetacher,
|
||||
adc.nodeStatusUpdater,
|
||||
adc.nodeLister,
|
||||
recorder)
|
||||
|
||||
csiTranslator := csitrans.New()
|
||||
|
@@ -27,13 +27,17 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
kevents "k8s.io/kubernetes/pkg/kubelet/events"
|
||||
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
||||
"k8s.io/kubernetes/pkg/util/taints"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
)
|
||||
@@ -69,6 +73,7 @@ func NewReconciler(
|
||||
actualStateOfWorld cache.ActualStateOfWorld,
|
||||
attacherDetacher operationexecutor.OperationExecutor,
|
||||
nodeStatusUpdater statusupdater.NodeStatusUpdater,
|
||||
nodeLister corelisters.NodeLister,
|
||||
recorder record.EventRecorder) Reconciler {
|
||||
return &reconciler{
|
||||
loopPeriod: loopPeriod,
|
||||
@@ -79,6 +84,7 @@ func NewReconciler(
|
||||
actualStateOfWorld: actualStateOfWorld,
|
||||
attacherDetacher: attacherDetacher,
|
||||
nodeStatusUpdater: nodeStatusUpdater,
|
||||
nodeLister: nodeLister,
|
||||
timeOfLastSync: time.Now(),
|
||||
recorder: recorder,
|
||||
}
|
||||
@@ -92,6 +98,7 @@ type reconciler struct {
|
||||
actualStateOfWorld cache.ActualStateOfWorld
|
||||
attacherDetacher operationexecutor.OperationExecutor
|
||||
nodeStatusUpdater statusupdater.NodeStatusUpdater
|
||||
nodeLister corelisters.NodeLister
|
||||
timeOfLastSync time.Time
|
||||
disableReconciliationSync bool
|
||||
recorder record.EventRecorder
|
||||
@@ -134,6 +141,19 @@ func (rc *reconciler) syncStates() {
|
||||
rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld)
|
||||
}
|
||||
|
||||
// hasOutOfServiceTaint returns true if the node has out-of-service taint present
|
||||
// and `NodeOutOfServiceVolumeDetach` feature gate is enabled.
|
||||
func (rc *reconciler) hasOutOfServiceTaint(nodeName types.NodeName) (bool, error) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.NodeOutOfServiceVolumeDetach) {
|
||||
node, err := rc.nodeLister.Get(string(nodeName))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return taints.TaintKeyExists(node.Spec.Taints, v1.TaintNodeOutOfService), nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (rc *reconciler) reconcile() {
|
||||
// Detaches are triggered before attaches so that volumes referenced by
|
||||
// pods that are rescheduled to a different node are detached first.
|
||||
@@ -183,8 +203,15 @@ func (rc *reconciler) reconcile() {
|
||||
}
|
||||
// Check whether timeout has reached the maximum waiting time
|
||||
timeout := elapsedTime > rc.maxWaitForUnmountDuration
|
||||
|
||||
hasOutOfServiceTaint, err := rc.hasOutOfServiceTaint(attachedVolume.NodeName)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get taint specs for node %s: %s", attachedVolume.NodeName, err.Error())
|
||||
}
|
||||
|
||||
// Check whether volume is still mounted. Skip detach if it is still mounted unless timeout
|
||||
if attachedVolume.MountedByNode && !timeout {
|
||||
// or the node has `node.kubernetes.io/out-of-service` taint.
|
||||
if attachedVolume.MountedByNode && !timeout && !hasOutOfServiceTaint {
|
||||
klog.V(5).InfoS("Cannot detach volume because it is still mounted", "volume", attachedVolume)
|
||||
continue
|
||||
}
|
||||
@@ -211,8 +238,12 @@ func (rc *reconciler) reconcile() {
|
||||
|
||||
// Trigger detach volume which requires verifying safe to detach step
|
||||
// If timeout is true, skip verifySafeToDetach check
|
||||
// If the node has node.kubernetes.io/out-of-service taint with NoExecute effect, skip verifySafeToDetach check
|
||||
klog.V(5).InfoS("Starting attacherDetacher.DetachVolume", "volume", attachedVolume)
|
||||
verifySafeToDetach := !timeout
|
||||
if hasOutOfServiceTaint {
|
||||
klog.V(4).Infof("node %q has out-of-service taint", attachedVolume.NodeName)
|
||||
}
|
||||
verifySafeToDetach := !(timeout || hasOutOfServiceTaint)
|
||||
err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
|
||||
if err == nil {
|
||||
if !timeout {
|
||||
|
@@ -21,14 +21,18 @@ import (
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
k8stypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/tools/record"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
|
||||
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
"k8s.io/kubernetes/pkg/volume/util/types"
|
||||
@@ -36,9 +40,10 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
reconcilerLoopPeriod time.Duration = 10 * time.Millisecond
|
||||
syncLoopPeriod time.Duration = 100 * time.Minute
|
||||
maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond
|
||||
reconcilerLoopPeriod time.Duration = 10 * time.Millisecond
|
||||
syncLoopPeriod time.Duration = 100 * time.Minute
|
||||
maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond
|
||||
maxLongWaitForUnmountDuration time.Duration = 4200 * time.Second
|
||||
)
|
||||
|
||||
// Calls Run()
|
||||
@@ -48,6 +53,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(volumePluginMgr)
|
||||
|
||||
fakeKubeClient := controllervolumetesting.CreateTestClient()
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
||||
@@ -59,8 +65,9 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nsu := statusupdater.NewNodeStatusUpdater(
|
||||
fakeKubeClient, informerFactory.Core().V1().Nodes().Lister(), asw)
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder)
|
||||
|
||||
// Act
|
||||
ch := make(chan struct{})
|
||||
@@ -91,9 +98,11 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
fakeHandler))
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder)
|
||||
podName := "pod-uid"
|
||||
volumeName := v1.UniqueVolumeName("volume-name")
|
||||
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
|
||||
@@ -142,9 +151,11 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
fakeHandler))
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder)
|
||||
podName := "pod-uid"
|
||||
volumeName := v1.UniqueVolumeName("volume-name")
|
||||
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
|
||||
@@ -214,9 +225,11 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
fakeHandler))
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder)
|
||||
podName := "pod-uid"
|
||||
volumeName := v1.UniqueVolumeName("volume-name")
|
||||
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
|
||||
@@ -286,9 +299,11 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
fakeHandler))
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */)
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder)
|
||||
podName := "pod-uid"
|
||||
volumeName := v1.UniqueVolumeName("volume-name")
|
||||
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
|
||||
@@ -362,8 +377,10 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteMany(t *testing.
|
||||
fakeRecorder,
|
||||
fakeHandler))
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder)
|
||||
podName1 := "pod-uid1"
|
||||
podName2 := "pod-uid2"
|
||||
volumeName := v1.UniqueVolumeName("volume-name")
|
||||
@@ -453,9 +470,11 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
fakeHandler))
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder)
|
||||
podName1 := "pod-uid1"
|
||||
podName2 := "pod-uid2"
|
||||
volumeName := v1.UniqueVolumeName("volume-name")
|
||||
@@ -543,9 +562,11 @@ func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
fakeHandler))
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder)
|
||||
podName1 := "pod-uid1"
|
||||
podName2 := "pod-uid2"
|
||||
volumeName := v1.UniqueVolumeName("volume-name")
|
||||
@@ -604,9 +625,11 @@ func Test_Run_OneVolumeDetachFailNodeWithReadWriteOnce(t *testing.T) {
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
fakeHandler))
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder)
|
||||
podName1 := "pod-uid1"
|
||||
podName2 := "pod-uid2"
|
||||
podName3 := "pod-uid3"
|
||||
@@ -705,9 +728,11 @@ func Test_Run_OneVolumeAttachAndDetachTimeoutNodesWithReadWriteOnce(t *testing.T
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
fakeHandler))
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder)
|
||||
podName1 := "pod-uid1"
|
||||
podName2 := "pod-uid2"
|
||||
volumeName := v1.UniqueVolumeName("volume-name")
|
||||
@@ -752,6 +777,165 @@ func Test_Run_OneVolumeAttachAndDetachTimeoutNodesWithReadWriteOnce(t *testing.T
|
||||
|
||||
}
|
||||
|
||||
// Populates desiredStateOfWorld cache with one node/volume/pod tuple.
|
||||
// The node has node.kubernetes.io/out-of-service taint present.
|
||||
//
|
||||
// The maxWaitForUnmountDuration is longer (in this case it is 4200 * time.Second so that detach does not happen
|
||||
// immediately due to timeout.
|
||||
//
|
||||
// Calls Run()
|
||||
// Verifies there is one attach call and no detach calls.
|
||||
// Deletes the pod from desiredStateOfWorld cache without first marking the node/volume as unmounted.
|
||||
// Verifies there is one detach call and no (new) attach calls.
|
||||
func Test_Run_OneVolumeDetachOnOutOfServiceTaintedNode(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeOutOfServiceVolumeDetach, true)()
|
||||
// Arrange
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(volumePluginMgr)
|
||||
fakeKubeClient := controllervolumetesting.CreateTestClient()
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
||||
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
||||
fakeKubeClient,
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
fakeHandler))
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxLongWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad,
|
||||
nsu, nodeLister, fakeRecorder)
|
||||
podName1 := "pod-uid1"
|
||||
volumeName1 := v1.UniqueVolumeName("volume-name1")
|
||||
volumeSpec1 := controllervolumetesting.GetTestVolumeSpec(string(volumeName1), volumeName1)
|
||||
nodeName1 := k8stypes.NodeName("worker-0")
|
||||
node1 := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: string(nodeName1)},
|
||||
Spec: v1.NodeSpec{
|
||||
Taints: []v1.Taint{{Key: v1.TaintNodeOutOfService, Effect: v1.TaintEffectNoExecute}},
|
||||
},
|
||||
}
|
||||
informerFactory.Core().V1().Nodes().Informer().GetStore().Add(node1)
|
||||
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
|
||||
volumeExists := dsw.VolumeExists(volumeName1, nodeName1)
|
||||
if volumeExists {
|
||||
t.Fatalf(
|
||||
"Volume %q/node %q should not exist, but it does.",
|
||||
volumeName1,
|
||||
nodeName1)
|
||||
}
|
||||
|
||||
generatedVolumeName, podErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1,
|
||||
podName1), volumeSpec1, nodeName1)
|
||||
if podErr != nil {
|
||||
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podErr)
|
||||
}
|
||||
|
||||
// Act
|
||||
ch := make(chan struct{})
|
||||
go reconciler.Run(ch)
|
||||
defer close(ch)
|
||||
|
||||
// Assert
|
||||
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
|
||||
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
|
||||
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
|
||||
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
|
||||
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
|
||||
|
||||
// Delete the pod and the volume will be detached only after the maxLongWaitForUnmountDuration expires as volume is
|
||||
//not unmounted. Here maxLongWaitForUnmountDuration is used to mimic that node is out of service.
|
||||
// But in this case the node has the node.kubernetes.io/out-of-service taint and hence it will not wait for
|
||||
// maxLongWaitForUnmountDuration and will progress to detach immediately.
|
||||
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
|
||||
// Assert -- Detach will be triggered if node has out of service taint
|
||||
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
|
||||
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
|
||||
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
|
||||
verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin)
|
||||
waitForDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin)
|
||||
}
|
||||
|
||||
// Populates desiredStateOfWorld cache with one node/volume/pod tuple.
|
||||
// The node does not have the node.kubernetes.io/out-of-service taint present.
|
||||
//
|
||||
// The maxWaitForUnmountDuration is longer (in this case it is 4200 * time.Second so that detach does not happen
|
||||
// immediately due to timeout.
|
||||
//
|
||||
// Calls Run()
|
||||
// Verifies there is one attach call and no detach calls.
|
||||
// Deletes the pod from desiredStateOfWorld cache without first marking the node/volume as unmounted.
|
||||
// Verifies there is no detach call and no (new) attach calls.
|
||||
func Test_Run_OneVolumeDetachOnNoOutOfServiceTaintedNode(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeOutOfServiceVolumeDetach, true)()
|
||||
// Arrange
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(volumePluginMgr)
|
||||
fakeKubeClient := controllervolumetesting.CreateTestClient()
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
||||
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
||||
fakeKubeClient,
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
fakeHandler))
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
reconciler := NewReconciler(
|
||||
reconcilerLoopPeriod, maxLongWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad,
|
||||
nsu, nodeLister, fakeRecorder)
|
||||
podName1 := "pod-uid1"
|
||||
volumeName1 := v1.UniqueVolumeName("volume-name1")
|
||||
volumeSpec1 := controllervolumetesting.GetTestVolumeSpec(string(volumeName1), volumeName1)
|
||||
nodeName1 := k8stypes.NodeName("worker-0")
|
||||
node1 := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: string(nodeName1)},
|
||||
}
|
||||
informerFactory.Core().V1().Nodes().Informer().GetStore().Add(node1)
|
||||
dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/)
|
||||
volumeExists := dsw.VolumeExists(volumeName1, nodeName1)
|
||||
if volumeExists {
|
||||
t.Fatalf(
|
||||
"Volume %q/node %q should not exist, but it does.",
|
||||
volumeName1,
|
||||
nodeName1)
|
||||
}
|
||||
|
||||
generatedVolumeName, podErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1,
|
||||
podName1), volumeSpec1, nodeName1)
|
||||
if podErr != nil {
|
||||
t.Fatalf("AddPod failed. Expected: <no error> Actual: <%v>", podErr)
|
||||
}
|
||||
|
||||
// Act
|
||||
ch := make(chan struct{})
|
||||
go reconciler.Run(ch)
|
||||
defer close(ch)
|
||||
|
||||
// Assert
|
||||
waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
|
||||
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
|
||||
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
|
||||
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
|
||||
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
|
||||
|
||||
// Delete the pod and the volume will be detached only after the maxLongWaitForUnmountDuration expires as volume is
|
||||
// not unmounted. Here maxLongWaitForUnmountDuration is used to mimic that node is out of service.
|
||||
// But in this case the node does not have the node.kubernetes.io/out-of-service taint and hence it will wait for
|
||||
// maxLongWaitForUnmountDuration and will not be detached immediately.
|
||||
dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1)
|
||||
// Assert -- Detach will be triggered only after maxLongWaitForUnmountDuration expires
|
||||
waitForNewDetacherCallCount(t, 0 /* expectedCallCount */, fakePlugin)
|
||||
verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin)
|
||||
waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin)
|
||||
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
|
||||
waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin)
|
||||
}
|
||||
|
||||
func Test_ReportMultiAttachError(t *testing.T) {
|
||||
type nodeWithPods struct {
|
||||
name k8stypes.NodeName
|
||||
@@ -810,9 +994,11 @@ func Test_ReportMultiAttachError(t *testing.T) {
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
fakeHandler))
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
|
||||
nodeLister := informerFactory.Core().V1().Nodes().Lister()
|
||||
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
|
||||
rc := NewReconciler(
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
|
||||
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder)
|
||||
|
||||
nodes := []k8stypes.NodeName{}
|
||||
for _, n := range test.nodes {
|
||||
|
Reference in New Issue
Block a user