From 195a5af23be8050b016b3ee806edfc3b36cfb238 Mon Sep 17 00:00:00 2001 From: Chris Henzie Date: Mon, 28 Dec 2020 12:22:10 -0800 Subject: [PATCH] Make volume attachment watch timeout configurable for csiAttacher --- pkg/volume/csi/csi_attacher.go | 13 +++---- pkg/volume/csi/csi_attacher_test.go | 54 +++++++++++++++++++++-------- pkg/volume/csi/csi_plugin.go | 5 +-- pkg/volume/csi/csi_plugin_test.go | 11 ++++-- pkg/volume/csi/csi_test.go | 8 ++--- 5 files changed, 63 insertions(+), 28 deletions(-) diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 6617a6423e4..fbeebdb40f7 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -46,8 +46,9 @@ const ( ) type csiAttacher struct { - plugin *csiPlugin - k8s kubernetes.Interface + plugin *csiPlugin + k8s kubernetes.Interface + watchTimeout time.Duration csiClient csiClient } @@ -120,7 +121,7 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle)) } - if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, csiTimeout); err != nil { + if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, c.watchTimeout); err != nil { return "", err } @@ -256,7 +257,7 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo } csi := c.csiClient - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + ctx, cancel := context.WithTimeout(context.Background(), c.watchTimeout) defer cancel() // Check whether "STAGE_UNSTAGE_VOLUME" is set stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx) @@ -397,7 +398,7 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error { } klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID)) - err := c.waitForVolumeDetachment(volID, attachID, csiTimeout) + err := c.waitForVolumeDetachment(volID, attachID, c.watchTimeout) return err } @@ -514,7 +515,7 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error { } csi := c.csiClient - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + ctx, cancel := context.WithTimeout(context.Background(), c.watchTimeout) defer cancel() // Check whether "STAGE_UNSTAGE_VOLUME" is set stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx) diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 615a7e6f732..cc305c8c4a4 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -52,6 +52,8 @@ import ( volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) +const testWatchTimeout = 10 * time.Second + var ( bFalse = false bTrue = true @@ -209,7 +211,7 @@ func TestAttacherAttach(t *testing.T) { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) // FIXME: We need to ensure this goroutine exits in the test. go func(spec *volume.Spec, nodename string, fail bool) { @@ -293,7 +295,7 @@ func TestAttacherAttachWithInline(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) // FIXME: We need to ensure this goroutine exits in the test. go func(spec *volume.Spec, nodename string, fail bool) { @@ -378,7 +380,7 @@ func TestAttacherWithCSIDriver(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, test.driver, "test-vol"), false) pluginCanAttach, err := plug.CanAttach(spec) @@ -473,7 +475,7 @@ func TestAttacherWaitForVolumeAttachmentWithCSIDriver(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, test.driver, "test-vol"), false) pluginCanAttach, err := plug.CanAttach(spec) @@ -547,7 +549,7 @@ func TestAttacherWaitForAttach(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) if test.makeAttachment != nil { attachment := test.makeAttachment() @@ -629,7 +631,7 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) if test.makeAttachment != nil { attachment := test.makeAttachment() @@ -716,7 +718,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) t.Logf("running test: %v", tc.name) pvName := fmt.Sprintf("test-pv-%d", i) volID := fmt.Sprintf("test-vol-%d", i) @@ -812,7 +814,7 @@ func TestAttacherVolumesAreAttached(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) nodeName := "fakeNode" var specs []*volume.Spec @@ -883,7 +885,7 @@ func TestAttacherVolumesAreAttachedWithInline(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) nodeName := "fakeNode" var specs []*volume.Spec @@ -976,7 +978,7 @@ func TestAttacherDetach(t *testing.T) { if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) pv := makeTestPV("test-pv", 10, testDriver, tc.volID) spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) @@ -1028,7 +1030,7 @@ func TestAttacherGetDeviceMountPath(t *testing.T) { if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) pluginDir := csiAttacher.plugin.host.GetPluginDir(plug.GetPluginName()) @@ -1193,7 +1195,7 @@ func TestAttacherMountDevice(t *testing.T) { if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet) if tc.deviceMountPath != "" { @@ -1356,7 +1358,7 @@ func TestAttacherMountDeviceWithInline(t *testing.T) { if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet) if tc.deviceMountPath != "" { @@ -1484,7 +1486,7 @@ func TestAttacherUnmountDevice(t *testing.T) { if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet) if tc.deviceMountPath != "" { @@ -1630,3 +1632,27 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset, setupInf return csiPlug, fakeWatcher, tmpDir, fakeClient } + +func getCsiAttacherFromVolumeAttacher(attacher volume.Attacher) *csiAttacher { + csiAttacher := attacher.(*csiAttacher) + csiAttacher.watchTimeout = testWatchTimeout + return csiAttacher +} + +func getCsiAttacherFromVolumeDetacher(detacher volume.Detacher) *csiAttacher { + csiAttacher := detacher.(*csiAttacher) + csiAttacher.watchTimeout = testWatchTimeout + return csiAttacher +} + +func getCsiAttacherFromDeviceMounter(deviceMounter volume.DeviceMounter) *csiAttacher { + csiAttacher := deviceMounter.(*csiAttacher) + csiAttacher.watchTimeout = testWatchTimeout + return csiAttacher +} + +func getCsiAttacherFromDeviceUnmounter(deviceUnmounter volume.DeviceUnmounter) *csiAttacher { + csiAttacher := deviceUnmounter.(*csiAttacher) + csiAttacher.watchTimeout = testWatchTimeout + return csiAttacher +} diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 0102f9e3213..5291a729e7d 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -944,8 +944,9 @@ func (p *csiPlugin) newAttacherDetacher() (*csiAttacher, error) { } return &csiAttacher{ - plugin: p, - k8s: k8s, + plugin: p, + k8s: k8s, + watchTimeout: csiTimeout, }, nil } diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index f8426f46c0d..4f0db8de0cb 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "testing" + "time" api "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" @@ -881,13 +882,16 @@ func TestPluginNewAttacher(t *testing.T) { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) if csiAttacher.plugin == nil { t.Error("plugin not set for attacher") } if csiAttacher.k8s == nil { t.Error("Kubernetes client not set for attacher") } + if csiAttacher.watchTimeout == time.Duration(0) { + t.Error("watch timeout not set for attacher") + } } func TestPluginNewDetacher(t *testing.T) { @@ -901,13 +905,16 @@ func TestPluginNewDetacher(t *testing.T) { t.Fatalf("failed to create new detacher: %v", err) } - csiDetacher := detacher.(*csiAttacher) + csiDetacher := getCsiAttacherFromVolumeDetacher(detacher) if csiDetacher.plugin == nil { t.Error("plugin not set for detacher") } if csiDetacher.k8s == nil { t.Error("Kubernetes client not set for detacher") } + if csiDetacher.watchTimeout == time.Duration(0) { + t.Error("watch timeout not set for detacher") + } } func TestPluginCanAttach(t *testing.T) { diff --git a/pkg/volume/csi/csi_test.go b/pkg/volume/csi/csi_test.go index 116f972655f..a55c35ad44e 100644 --- a/pkg/volume/csi/csi_test.go +++ b/pkg/volume/csi/csi_test.go @@ -387,7 +387,7 @@ func TestCSI_VolumeAll(t *testing.T) { } if devMounter != nil { - csiDevMounter := devMounter.(*csiAttacher) + csiDevMounter := getCsiAttacherFromDeviceMounter(devMounter) csiDevMounter.csiClient = csiClient devMountPath, err := csiDevMounter.GetDeviceMountPath(volSpec) if err != nil { @@ -550,8 +550,8 @@ func TestCSI_VolumeAll(t *testing.T) { } if devMounter != nil && devUnmounter != nil { - csiDevMounter := devMounter.(*csiAttacher) - csiDevUnmounter := devUnmounter.(*csiAttacher) + csiDevMounter := getCsiAttacherFromDeviceMounter(devMounter) + csiDevUnmounter := getCsiAttacherFromDeviceUnmounter(devUnmounter) csiDevUnmounter.csiClient = csiClient devMountPath, err := csiDevMounter.GetDeviceMountPath(volSpec) @@ -592,7 +592,7 @@ func TestCSI_VolumeAll(t *testing.T) { if err != nil { t.Fatal("csiTest.VolumeAll volumePlugin.GetVolumeName failed:", err) } - csiDetacher := volDetacher.(*csiAttacher) + csiDetacher := getCsiAttacherFromVolumeDetacher(volDetacher) csiDetacher.csiClient = csiClient if err := csiDetacher.Detach(volName, host.GetNodeName()); err != nil { t.Fatal("csiTest.VolumeAll detacher.Detach failed:", err)