From b4e3f272b90d485541d371138471aee55bde2a63 Mon Sep 17 00:00:00 2001 From: Chris Henzie Date: Mon, 28 Dec 2020 11:52:32 -0800 Subject: [PATCH 1/7] Remove unused waitSleepTime field in csiAttacher --- pkg/volume/csi/csi_attacher.go | 5 ++--- pkg/volume/csi/csi_attacher_test.go | 1 - pkg/volume/csi/csi_plugin.go | 5 ++--- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 18c6c0a7899..6617a6423e4 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -46,9 +46,8 @@ const ( ) type csiAttacher struct { - plugin *csiPlugin - k8s kubernetes.Interface - waitSleepTime time.Duration + plugin *csiPlugin + k8s kubernetes.Interface csiClient csiClient } diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 5e45f87c2a0..615a7e6f732 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -990,7 +990,6 @@ func TestAttacherDetach(t *testing.T) { t.Errorf("test case %s failed: %v", tc.name, err) } watchError := tc.watcherError - csiAttacher.waitSleepTime = 100 * time.Millisecond go func() { if watchError { errStatus := apierrors.NewInternalError(fmt.Errorf("we got an error")).Status() diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 77826866dbf..0102f9e3213 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -944,9 +944,8 @@ func (p *csiPlugin) newAttacherDetacher() (*csiAttacher, error) { } return &csiAttacher{ - plugin: p, - k8s: k8s, - waitSleepTime: 1 * time.Second, + plugin: p, + k8s: k8s, }, nil } From 195a5af23be8050b016b3ee806edfc3b36cfb238 Mon Sep 17 00:00:00 2001 From: Chris Henzie Date: Mon, 28 Dec 2020 12:22:10 -0800 Subject: [PATCH 2/7] Make volume attachment watch timeout configurable for csiAttacher --- pkg/volume/csi/csi_attacher.go | 13 +++---- pkg/volume/csi/csi_attacher_test.go | 54 +++++++++++++++++++++-------- pkg/volume/csi/csi_plugin.go | 5 +-- pkg/volume/csi/csi_plugin_test.go | 11 ++++-- pkg/volume/csi/csi_test.go | 8 ++--- 5 files changed, 63 insertions(+), 28 deletions(-) diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 6617a6423e4..fbeebdb40f7 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -46,8 +46,9 @@ const ( ) type csiAttacher struct { - plugin *csiPlugin - k8s kubernetes.Interface + plugin *csiPlugin + k8s kubernetes.Interface + watchTimeout time.Duration csiClient csiClient } @@ -120,7 +121,7 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle)) } - if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, csiTimeout); err != nil { + if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, c.watchTimeout); err != nil { return "", err } @@ -256,7 +257,7 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo } csi := c.csiClient - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + ctx, cancel := context.WithTimeout(context.Background(), c.watchTimeout) defer cancel() // Check whether "STAGE_UNSTAGE_VOLUME" is set stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx) @@ -397,7 +398,7 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error { } klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID)) - err := c.waitForVolumeDetachment(volID, attachID, csiTimeout) + err := c.waitForVolumeDetachment(volID, attachID, c.watchTimeout) return err } @@ -514,7 +515,7 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error { } csi := c.csiClient - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + ctx, cancel := context.WithTimeout(context.Background(), c.watchTimeout) defer cancel() // Check whether "STAGE_UNSTAGE_VOLUME" is set stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx) diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 615a7e6f732..cc305c8c4a4 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -52,6 +52,8 @@ import ( volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) +const testWatchTimeout = 10 * time.Second + var ( bFalse = false bTrue = true @@ -209,7 +211,7 @@ func TestAttacherAttach(t *testing.T) { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) // FIXME: We need to ensure this goroutine exits in the test. go func(spec *volume.Spec, nodename string, fail bool) { @@ -293,7 +295,7 @@ func TestAttacherAttachWithInline(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) // FIXME: We need to ensure this goroutine exits in the test. go func(spec *volume.Spec, nodename string, fail bool) { @@ -378,7 +380,7 @@ func TestAttacherWithCSIDriver(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, test.driver, "test-vol"), false) pluginCanAttach, err := plug.CanAttach(spec) @@ -473,7 +475,7 @@ func TestAttacherWaitForVolumeAttachmentWithCSIDriver(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, test.driver, "test-vol"), false) pluginCanAttach, err := plug.CanAttach(spec) @@ -547,7 +549,7 @@ func TestAttacherWaitForAttach(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) if test.makeAttachment != nil { attachment := test.makeAttachment() @@ -629,7 +631,7 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) if test.makeAttachment != nil { attachment := test.makeAttachment() @@ -716,7 +718,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) t.Logf("running test: %v", tc.name) pvName := fmt.Sprintf("test-pv-%d", i) volID := fmt.Sprintf("test-vol-%d", i) @@ -812,7 +814,7 @@ func TestAttacherVolumesAreAttached(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) nodeName := "fakeNode" var specs []*volume.Spec @@ -883,7 +885,7 @@ func TestAttacherVolumesAreAttachedWithInline(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) nodeName := "fakeNode" var specs []*volume.Spec @@ -976,7 +978,7 @@ func TestAttacherDetach(t *testing.T) { if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) pv := makeTestPV("test-pv", 10, testDriver, tc.volID) spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) @@ -1028,7 +1030,7 @@ func TestAttacherGetDeviceMountPath(t *testing.T) { if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) pluginDir := csiAttacher.plugin.host.GetPluginDir(plug.GetPluginName()) @@ -1193,7 +1195,7 @@ func TestAttacherMountDevice(t *testing.T) { if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet) if tc.deviceMountPath != "" { @@ -1356,7 +1358,7 @@ func TestAttacherMountDeviceWithInline(t *testing.T) { if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet) if tc.deviceMountPath != "" { @@ -1484,7 +1486,7 @@ func TestAttacherUnmountDevice(t *testing.T) { if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet) if tc.deviceMountPath != "" { @@ -1630,3 +1632,27 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset, setupInf return csiPlug, fakeWatcher, tmpDir, fakeClient } + +func getCsiAttacherFromVolumeAttacher(attacher volume.Attacher) *csiAttacher { + csiAttacher := attacher.(*csiAttacher) + csiAttacher.watchTimeout = testWatchTimeout + return csiAttacher +} + +func getCsiAttacherFromVolumeDetacher(detacher volume.Detacher) *csiAttacher { + csiAttacher := detacher.(*csiAttacher) + csiAttacher.watchTimeout = testWatchTimeout + return csiAttacher +} + +func getCsiAttacherFromDeviceMounter(deviceMounter volume.DeviceMounter) *csiAttacher { + csiAttacher := deviceMounter.(*csiAttacher) + csiAttacher.watchTimeout = testWatchTimeout + return csiAttacher +} + +func getCsiAttacherFromDeviceUnmounter(deviceUnmounter volume.DeviceUnmounter) *csiAttacher { + csiAttacher := deviceUnmounter.(*csiAttacher) + csiAttacher.watchTimeout = testWatchTimeout + return csiAttacher +} diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 0102f9e3213..5291a729e7d 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -944,8 +944,9 @@ func (p *csiPlugin) newAttacherDetacher() (*csiAttacher, error) { } return &csiAttacher{ - plugin: p, - k8s: k8s, + plugin: p, + k8s: k8s, + watchTimeout: csiTimeout, }, nil } diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index f8426f46c0d..4f0db8de0cb 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "testing" + "time" api "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" @@ -881,13 +882,16 @@ func TestPluginNewAttacher(t *testing.T) { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) if csiAttacher.plugin == nil { t.Error("plugin not set for attacher") } if csiAttacher.k8s == nil { t.Error("Kubernetes client not set for attacher") } + if csiAttacher.watchTimeout == time.Duration(0) { + t.Error("watch timeout not set for attacher") + } } func TestPluginNewDetacher(t *testing.T) { @@ -901,13 +905,16 @@ func TestPluginNewDetacher(t *testing.T) { t.Fatalf("failed to create new detacher: %v", err) } - csiDetacher := detacher.(*csiAttacher) + csiDetacher := getCsiAttacherFromVolumeDetacher(detacher) if csiDetacher.plugin == nil { t.Error("plugin not set for detacher") } if csiDetacher.k8s == nil { t.Error("Kubernetes client not set for detacher") } + if csiDetacher.watchTimeout == time.Duration(0) { + t.Error("watch timeout not set for detacher") + } } func TestPluginCanAttach(t *testing.T) { diff --git a/pkg/volume/csi/csi_test.go b/pkg/volume/csi/csi_test.go index 116f972655f..a55c35ad44e 100644 --- a/pkg/volume/csi/csi_test.go +++ b/pkg/volume/csi/csi_test.go @@ -387,7 +387,7 @@ func TestCSI_VolumeAll(t *testing.T) { } if devMounter != nil { - csiDevMounter := devMounter.(*csiAttacher) + csiDevMounter := getCsiAttacherFromDeviceMounter(devMounter) csiDevMounter.csiClient = csiClient devMountPath, err := csiDevMounter.GetDeviceMountPath(volSpec) if err != nil { @@ -550,8 +550,8 @@ func TestCSI_VolumeAll(t *testing.T) { } if devMounter != nil && devUnmounter != nil { - csiDevMounter := devMounter.(*csiAttacher) - csiDevUnmounter := devUnmounter.(*csiAttacher) + csiDevMounter := getCsiAttacherFromDeviceMounter(devMounter) + csiDevUnmounter := getCsiAttacherFromDeviceUnmounter(devUnmounter) csiDevUnmounter.csiClient = csiClient devMountPath, err := csiDevMounter.GetDeviceMountPath(volSpec) @@ -592,7 +592,7 @@ func TestCSI_VolumeAll(t *testing.T) { if err != nil { t.Fatal("csiTest.VolumeAll volumePlugin.GetVolumeName failed:", err) } - csiDetacher := volDetacher.(*csiAttacher) + csiDetacher := getCsiAttacherFromVolumeDetacher(volDetacher) csiDetacher.csiClient = csiClient if err := csiDetacher.Detach(volName, host.GetNodeName()); err != nil { t.Fatal("csiTest.VolumeAll detacher.Detach failed:", err) From f098ada9d167550f570bb5837ee885c5f0bbc895 Mon Sep 17 00:00:00 2001 From: Chris Henzie Date: Tue, 29 Dec 2020 14:06:13 -0800 Subject: [PATCH 3/7] Extract watch reactor out of test CSI plugin constructor --- pkg/volume/csi/csi_attacher_test.go | 57 ++++++++++++++++++----------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index cc305c8c4a4..5fda9eb6973 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -203,9 +203,12 @@ func TestAttacherAttach(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("test case: %s", tc.name) - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) + plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil, false) defer os.RemoveAll(tmpDir) + fakeWatcher := watch.NewRaceFreeFake() + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + attacher, err := plug.NewAttacher() if err != nil { t.Fatalf("failed to create new attacher: %v", err) @@ -288,9 +291,12 @@ func TestAttacherAttachWithInline(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("test case: %s", tc.name) - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) + plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil, false) defer os.RemoveAll(tmpDir) + fakeWatcher := watch.NewRaceFreeFake() + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + attacher, err := plug.NewAttacher() if err != nil { t.Fatalf("failed to create new attacher: %v", err) @@ -357,7 +363,7 @@ func TestAttacherWithCSIDriver(t *testing.T) { getTestCSIDriver("attachable", nil, &bTrue, nil), getTestCSIDriver("nil", nil, nil, nil), ) - plug, _, tmpDir, _ := newTestWatchPlugin(t, fakeClient, true) + plug, tmpDir, _ := newTestWatchPlugin(t, fakeClient, true) defer os.RemoveAll(tmpDir) attachmentWatchCreated := make(chan core.Action) @@ -542,7 +548,7 @@ func TestAttacherWaitForAttach(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true) + plug, tmpDir, _ := newTestWatchPlugin(t, nil, true) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -624,7 +630,7 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true) + plug, tmpDir, _ := newTestWatchPlugin(t, nil, true) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -711,14 +717,18 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { for i, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) + plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil, false) defer os.RemoveAll(tmpDir) + fakeWatcher := watch.NewRaceFreeFake() + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + attacher, err := plug.NewAttacher() if err != nil { t.Fatalf("failed to create new attacher: %v", err) } csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) + t.Logf("running test: %v", tc.name) pvName := fmt.Sprintf("test-pv-%d", i) volID := fmt.Sprintf("test-vol-%d", i) @@ -968,10 +978,14 @@ func TestAttacherDetach(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("running test: %v", tc.name) - plug, fakeWatcher, tmpDir, client := newTestWatchPlugin(t, nil, false) + plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil, false) defer os.RemoveAll(tmpDir) + + fakeWatcher := watch.NewRaceFreeFake() + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + if tc.reactor != nil { - client.PrependReactor("*", "*", tc.reactor) + fakeClient.PrependReactor("*", "*", tc.reactor) } attacher, err0 := plug.NewAttacher() @@ -1024,7 +1038,7 @@ func TestAttacherDetach(t *testing.T) { func TestAttacherGetDeviceMountPath(t *testing.T) { // Setup // Create a new attacher - plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true) + plug, tmpDir, _ := newTestWatchPlugin(t, nil, true) defer os.RemoveAll(tmpDir) attacher, err0 := plug.NewAttacher() if err0 != nil { @@ -1189,8 +1203,12 @@ func TestAttacherMountDevice(t *testing.T) { // Setup // Create a new attacher - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) + plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil, false) defer os.RemoveAll(tmpDir) + + fakeWatcher := watch.NewRaceFreeFake() + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + attacher, err0 := plug.NewAttacher() if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) @@ -1352,8 +1370,12 @@ func TestAttacherMountDeviceWithInline(t *testing.T) { // Setup // Create a new attacher - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) + plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil, false) defer os.RemoveAll(tmpDir) + + fakeWatcher := watch.NewRaceFreeFake() + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + attacher, err0 := plug.NewAttacher() if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) @@ -1480,7 +1502,7 @@ func TestAttacherUnmountDevice(t *testing.T) { t.Logf("Running test case: %s", tc.testName) // Setup // Create a new attacher - plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true) + plug, tmpDir, _ := newTestWatchPlugin(t, nil, true) defer os.RemoveAll(tmpDir) attacher, err0 := plug.NewAttacher() if err0 != nil { @@ -1567,7 +1589,7 @@ func TestAttacherUnmountDevice(t *testing.T) { } // create a plugin mgr to load plugins and setup a fake client -func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset, setupInformer bool) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) { +func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset, setupInformer bool) (*csiPlugin, string, *fakeclient.Clientset) { tmpDir, err := utiltesting.MkTmpdir("csi-test") if err != nil { t.Fatalf("can't create temp dir: %v", err) @@ -1582,13 +1604,6 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset, setupInf }, Spec: v1.NodeSpec{}, }) - fakeWatcher := watch.NewRaceFreeFake() - if !setupInformer { - // TODO: In the fakeClient, if default watchReactor is overwritten, the volumeAttachmentInformer - // and the csiAttacher.Attach both endup reading from same channel causing hang in Attach(). - // So, until this is fixed, we don't overwrite default reactor while setting up volumeAttachment informer. - fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) - } // Start informer for CSIDrivers. factory := informers.NewSharedInformerFactory(fakeClient, CsiResyncPeriod) @@ -1630,7 +1645,7 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset, setupInf t.Fatalf("cannot assert plugin to be type csiPlugin") } - return csiPlug, fakeWatcher, tmpDir, fakeClient + return csiPlug, tmpDir, fakeClient } func getCsiAttacherFromVolumeAttacher(attacher volume.Attacher) *csiAttacher { From 70eddfefec1c24e097c197873f8121ad8abe7555 Mon Sep 17 00:00:00 2001 From: Chris Henzie Date: Wed, 23 Dec 2020 13:25:32 -0800 Subject: [PATCH 4/7] Ensure goroutines exit in CSI attacher tests The reason for adding an expectedVolumeHost field is to prevent a channel read from hanging when a test fails prior to creating the VolumeAttachment watch. --- pkg/volume/csi/csi_attacher_test.go | 30 +++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 5fda9eb6973..64ecdf197cb 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -216,8 +216,10 @@ func TestAttacherAttach(t *testing.T) { csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) - // FIXME: We need to ensure this goroutine exits in the test. + var wg sync.WaitGroup + wg.Add(1) go func(spec *volume.Spec, nodename string, fail bool) { + defer wg.Done() attachID, err := csiAttacher.Attach(spec, types.NodeName(nodename)) if !fail && err != nil { t.Errorf("expecting no failure, but got err: %v", err) @@ -242,6 +244,7 @@ func TestAttacherAttach(t *testing.T) { status.Attached = true } markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, tc.attachID, status) + wg.Wait() }) } } @@ -303,8 +306,10 @@ func TestAttacherAttachWithInline(t *testing.T) { } csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) - // FIXME: We need to ensure this goroutine exits in the test. + var wg sync.WaitGroup + wg.Add(1) go func(spec *volume.Spec, nodename string, fail bool) { + defer wg.Done() attachID, err := csiAttacher.Attach(spec, types.NodeName(nodename)) if fail != (err != nil) { t.Errorf("expecting no failure, but got err: %v", err) @@ -324,6 +329,7 @@ func TestAttacherAttachWithInline(t *testing.T) { status.Attached = true } markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, tc.attachID, status) + wg.Wait() }) } } @@ -744,9 +750,12 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { trigerWatchEventTime := tc.trigerWatchEventTime finalAttached := tc.finalAttached finalAttachErr := tc.finalAttachErr + var wg sync.WaitGroup // after timeout, fakeWatcher will be closed by csiAttacher.waitForVolumeAttachment if tc.trigerWatchEventTime > 0 && tc.trigerWatchEventTime < tc.timeout { + wg.Add(1) go func() { + defer wg.Done() time.Sleep(trigerWatchEventTime) attachment := makeTestAttachment(attachID, nodeName, pvName) attachment.Status.Attached = finalAttached @@ -767,6 +776,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { if err == nil && retID != attachID { t.Errorf("attacher.WaitForAttach not returning attachment ID") } + wg.Wait() }) } } @@ -1006,7 +1016,10 @@ func TestAttacherDetach(t *testing.T) { t.Errorf("test case %s failed: %v", tc.name, err) } watchError := tc.watcherError + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() if watchError { errStatus := apierrors.NewInternalError(fmt.Errorf("we got an error")).Status() fakeWatcher.Error(&errStatus) @@ -1031,6 +1044,7 @@ func TestAttacherDetach(t *testing.T) { t.Errorf("expecting attachment not to be nil, but it is") } } + wg.Wait() }) } } @@ -1222,6 +1236,7 @@ func TestAttacherMountDevice(t *testing.T) { nodeName := string(csiAttacher.plugin.host.GetNodeName()) attachID := getAttachmentName(tc.volName, testDriver, nodeName) + var wg sync.WaitGroup if tc.createAttachment { // Set up volume attachment @@ -1230,7 +1245,9 @@ func TestAttacherMountDevice(t *testing.T) { if err != nil { t.Fatalf("failed to attach: %v", err) } + wg.Add(1) go func() { + defer wg.Done() fakeWatcher.Delete(attachment) }() } @@ -1288,6 +1305,8 @@ func TestAttacherMountDevice(t *testing.T) { } } } + + wg.Wait() }) } } @@ -1396,7 +1415,12 @@ func TestAttacherMountDeviceWithInline(t *testing.T) { if err != nil { t.Fatalf("failed to attach: %v", err) } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() fakeWatcher.Delete(attachment) }() @@ -1434,6 +1458,8 @@ func TestAttacherMountDeviceWithInline(t *testing.T) { t.Errorf("expected mount path: %s. got: %s", tc.deviceMountPath, vol.Path) } } + + wg.Wait() }) } } From 257704b30d749e9b43440f1d95d692980ecc02cd Mon Sep 17 00:00:00 2001 From: Chris Henzie Date: Wed, 23 Dec 2020 19:45:54 -0800 Subject: [PATCH 5/7] Remove setupInformer argument from test CSI plugin constructor --- pkg/volume/csi/BUILD | 2 -- pkg/volume/csi/csi_attacher_test.go | 34 ++++++++++++----------------- 2 files changed, 14 insertions(+), 22 deletions(-) diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 62123c8ef55..b98e093dad3 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -85,10 +85,8 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", - "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/util/testing:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 64ecdf197cb..f381d2aaa36 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -38,10 +38,8 @@ import ( "k8s.io/apimachinery/pkg/watch" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" - storageinformer "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" fakeclient "k8s.io/client-go/kubernetes/fake" - storagelister "k8s.io/client-go/listers/storage/v1" core "k8s.io/client-go/testing" utiltesting "k8s.io/client-go/util/testing" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -203,7 +201,7 @@ func TestAttacherAttach(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("test case: %s", tc.name) - plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil, false) + plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil) defer os.RemoveAll(tmpDir) fakeWatcher := watch.NewRaceFreeFake() @@ -294,7 +292,7 @@ func TestAttacherAttachWithInline(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("test case: %s", tc.name) - plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil, false) + plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil) defer os.RemoveAll(tmpDir) fakeWatcher := watch.NewRaceFreeFake() @@ -369,7 +367,7 @@ func TestAttacherWithCSIDriver(t *testing.T) { getTestCSIDriver("attachable", nil, &bTrue, nil), getTestCSIDriver("nil", nil, nil, nil), ) - plug, tmpDir, _ := newTestWatchPlugin(t, fakeClient, true) + plug, tmpDir, _ := newTestWatchPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) attachmentWatchCreated := make(chan core.Action) @@ -554,7 +552,7 @@ func TestAttacherWaitForAttach(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - plug, tmpDir, _ := newTestWatchPlugin(t, nil, true) + plug, tmpDir, _ := newTestWatchPlugin(t, nil) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -636,7 +634,7 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - plug, tmpDir, _ := newTestWatchPlugin(t, nil, true) + plug, tmpDir, _ := newTestWatchPlugin(t, nil) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -723,7 +721,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { for i, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil, false) + plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil) defer os.RemoveAll(tmpDir) fakeWatcher := watch.NewRaceFreeFake() @@ -988,7 +986,7 @@ func TestAttacherDetach(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("running test: %v", tc.name) - plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil, false) + plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil) defer os.RemoveAll(tmpDir) fakeWatcher := watch.NewRaceFreeFake() @@ -1052,7 +1050,7 @@ func TestAttacherDetach(t *testing.T) { func TestAttacherGetDeviceMountPath(t *testing.T) { // Setup // Create a new attacher - plug, tmpDir, _ := newTestWatchPlugin(t, nil, true) + plug, tmpDir, _ := newTestWatchPlugin(t, nil) defer os.RemoveAll(tmpDir) attacher, err0 := plug.NewAttacher() if err0 != nil { @@ -1217,7 +1215,7 @@ func TestAttacherMountDevice(t *testing.T) { // Setup // Create a new attacher - plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil, false) + plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil) defer os.RemoveAll(tmpDir) fakeWatcher := watch.NewRaceFreeFake() @@ -1389,7 +1387,7 @@ func TestAttacherMountDeviceWithInline(t *testing.T) { // Setup // Create a new attacher - plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil, false) + plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil) defer os.RemoveAll(tmpDir) fakeWatcher := watch.NewRaceFreeFake() @@ -1528,7 +1526,7 @@ func TestAttacherUnmountDevice(t *testing.T) { t.Logf("Running test case: %s", tc.testName) // Setup // Create a new attacher - plug, tmpDir, _ := newTestWatchPlugin(t, nil, true) + plug, tmpDir, _ := newTestWatchPlugin(t, nil) defer os.RemoveAll(tmpDir) attacher, err0 := plug.NewAttacher() if err0 != nil { @@ -1615,7 +1613,7 @@ func TestAttacherUnmountDevice(t *testing.T) { } // create a plugin mgr to load plugins and setup a fake client -func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset, setupInformer bool) (*csiPlugin, string, *fakeclient.Clientset) { +func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlugin, string, *fakeclient.Clientset) { tmpDir, err := utiltesting.MkTmpdir("csi-test") if err != nil { t.Fatalf("can't create temp dir: %v", err) @@ -1635,12 +1633,8 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset, setupInf factory := informers.NewSharedInformerFactory(fakeClient, CsiResyncPeriod) csiDriverInformer := factory.Storage().V1().CSIDrivers() csiDriverLister := csiDriverInformer.Lister() - var volumeAttachmentInformer storageinformer.VolumeAttachmentInformer - var volumeAttachmentLister storagelister.VolumeAttachmentLister - if setupInformer { - volumeAttachmentInformer = factory.Storage().V1().VolumeAttachments() - volumeAttachmentLister = volumeAttachmentInformer.Lister() - } + volumeAttachmentInformer := factory.Storage().V1().VolumeAttachments() + volumeAttachmentLister := volumeAttachmentInformer.Lister() factory.Start(wait.NeverStop) ctx, cancel := context.WithTimeout(context.Background(), TestInformerSyncTimeout) From 359b9e33830754c06ced68aada614e99ad68ba53 Mon Sep 17 00:00:00 2001 From: Chris Henzie Date: Tue, 24 Nov 2020 17:03:17 -0800 Subject: [PATCH 6/7] Ensure CSI test plugin informers are synced before returning This way of syncing informers ensures all informers generated from these factories are synced. Informers are lazy loaded, and only created once a user calls .Informer() (which was why the .PollImmediate() worked). Asserting the number of synced types to ensure everything is set up correctly. --- pkg/volume/csi/csi_attacher_test.go | 6 +++++- pkg/volume/csi/csi_plugin_test.go | 20 +++++++++++--------- pkg/volume/csi/testing/testing.go | 17 +++++++++++------ 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index f381d2aaa36..fbcdc5f59d2 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -1639,7 +1639,11 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu factory.Start(wait.NeverStop) ctx, cancel := context.WithTimeout(context.Background(), TestInformerSyncTimeout) defer cancel() - for ty, ok := range factory.WaitForCacheSync(ctx.Done()) { + syncedTypes := factory.WaitForCacheSync(ctx.Done()) + if len(syncedTypes) != 2 { + t.Fatalf("informers are not synced") + } + for ty, ok := range syncedTypes { if !ok { t.Fatalf("failed to sync: %#v", ty) } diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index 4f0db8de0cb..140ca4834ec 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -65,7 +65,17 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri csiDriverLister := csiDriverInformer.Lister() volumeAttachmentInformer := factory.Storage().V1().VolumeAttachments() volumeAttachmentLister := volumeAttachmentInformer.Lister() - go factory.Start(wait.NeverStop) + + factory.Start(wait.NeverStop) + syncedTypes := factory.WaitForCacheSync(wait.NeverStop) + if len(syncedTypes) != 2 { + t.Fatalf("informers are not synced") + } + for ty, ok := range syncedTypes { + if !ok { + t.Fatalf("failed to sync: %#v", ty) + } + } host := volumetest.NewFakeKubeletVolumeHostWithCSINodeName(t, tmpDir, @@ -87,14 +97,6 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri t.Fatalf("cannot assert plugin to be type csiPlugin") } - // Wait until the informer in CSI volume plugin has all CSIDrivers. - wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) { - return csiDriverInformer.Informer().HasSynced(), nil - }) - - wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) { - return volumeAttachmentInformer.Informer().HasSynced(), nil - }) return csiPlug, tmpDir } diff --git a/pkg/volume/csi/testing/testing.go b/pkg/volume/csi/testing/testing.go index 2c0a26169f3..89a740d8a74 100644 --- a/pkg/volume/csi/testing/testing.go +++ b/pkg/volume/csi/testing/testing.go @@ -52,7 +52,17 @@ func NewTestPlugin(t *testing.T, client *fakeclient.Clientset) (*volume.VolumePl factory := informers.NewSharedInformerFactory(client, csi.CsiResyncPeriod) csiDriverInformer := factory.Storage().V1().CSIDrivers() csiDriverLister := csiDriverInformer.Lister() - go factory.Start(wait.NeverStop) + + factory.Start(wait.NeverStop) + syncedTypes := factory.WaitForCacheSync(wait.NeverStop) + if len(syncedTypes) != 1 { + t.Fatalf("informers are not synced") + } + for ty, ok := range syncedTypes { + if !ok { + t.Fatalf("failed to sync: %#v", ty) + } + } host := volumetest.NewFakeVolumeHostWithCSINodeName(t, tmpDir, @@ -69,10 +79,5 @@ func NewTestPlugin(t *testing.T, client *fakeclient.Clientset) (*volume.VolumePl t.Fatalf("can't find plugin %v", csi.CSIPluginName) } - // Wait until the informer in CSI volume plugin has all CSIDrivers. - wait.PollImmediate(csi.TestInformerSyncPeriod, csi.TestInformerSyncTimeout, func() (bool, error) { - return csiDriverInformer.Informer().HasSynced(), nil - }) - return plugMgr, &plug, tmpDir } From 736b7e15b74e95825f1e5826231508ac73265711 Mon Sep 17 00:00:00 2001 From: Chris Henzie Date: Mon, 28 Dec 2020 13:51:46 -0800 Subject: [PATCH 7/7] Remove duplicate test CSI plugin constructor logic --- pkg/volume/csi/csi_attacher_test.go | 96 +++++++---------------------- 1 file changed, 21 insertions(+), 75 deletions(-) diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index fbcdc5f59d2..35c25223bf7 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -34,19 +34,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" fakeclient "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" - utiltesting "k8s.io/client-go/util/testing" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" fakecsi "k8s.io/kubernetes/pkg/volume/csi/fake" - volumetest "k8s.io/kubernetes/pkg/volume/testing" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) @@ -201,7 +197,8 @@ func TestAttacherAttach(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("test case: %s", tc.name) - plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) fakeWatcher := watch.NewRaceFreeFake() @@ -292,7 +289,8 @@ func TestAttacherAttachWithInline(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("test case: %s", tc.name) - plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) fakeWatcher := watch.NewRaceFreeFake() @@ -367,7 +365,7 @@ func TestAttacherWithCSIDriver(t *testing.T) { getTestCSIDriver("attachable", nil, &bTrue, nil), getTestCSIDriver("nil", nil, nil, nil), ) - plug, tmpDir, _ := newTestWatchPlugin(t, fakeClient) + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) attachmentWatchCreated := make(chan core.Action) @@ -552,7 +550,8 @@ func TestAttacherWaitForAttach(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - plug, tmpDir, _ := newTestWatchPlugin(t, nil) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -634,7 +633,8 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - plug, tmpDir, _ := newTestWatchPlugin(t, nil) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -721,7 +721,8 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { for i, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) fakeWatcher := watch.NewRaceFreeFake() @@ -986,7 +987,8 @@ func TestAttacherDetach(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("running test: %v", tc.name) - plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) fakeWatcher := watch.NewRaceFreeFake() @@ -1050,7 +1052,8 @@ func TestAttacherDetach(t *testing.T) { func TestAttacherGetDeviceMountPath(t *testing.T) { // Setup // Create a new attacher - plug, tmpDir, _ := newTestWatchPlugin(t, nil) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) attacher, err0 := plug.NewAttacher() if err0 != nil { @@ -1215,7 +1218,8 @@ func TestAttacherMountDevice(t *testing.T) { // Setup // Create a new attacher - plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) fakeWatcher := watch.NewRaceFreeFake() @@ -1387,7 +1391,8 @@ func TestAttacherMountDeviceWithInline(t *testing.T) { // Setup // Create a new attacher - plug, tmpDir, fakeClient := newTestWatchPlugin(t, nil) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) fakeWatcher := watch.NewRaceFreeFake() @@ -1526,7 +1531,8 @@ func TestAttacherUnmountDevice(t *testing.T) { t.Logf("Running test case: %s", tc.testName) // Setup // Create a new attacher - plug, tmpDir, _ := newTestWatchPlugin(t, nil) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) attacher, err0 := plug.NewAttacher() if err0 != nil { @@ -1612,66 +1618,6 @@ func TestAttacherUnmountDevice(t *testing.T) { } } -// create a plugin mgr to load plugins and setup a fake client -func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlugin, string, *fakeclient.Clientset) { - tmpDir, err := utiltesting.MkTmpdir("csi-test") - if err != nil { - t.Fatalf("can't create temp dir: %v", err) - } - - if fakeClient == nil { - fakeClient = fakeclient.NewSimpleClientset() - } - fakeClient.Tracker().Add(&v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "fakeNode", - }, - Spec: v1.NodeSpec{}, - }) - - // Start informer for CSIDrivers. - factory := informers.NewSharedInformerFactory(fakeClient, CsiResyncPeriod) - csiDriverInformer := factory.Storage().V1().CSIDrivers() - csiDriverLister := csiDriverInformer.Lister() - volumeAttachmentInformer := factory.Storage().V1().VolumeAttachments() - volumeAttachmentLister := volumeAttachmentInformer.Lister() - - factory.Start(wait.NeverStop) - ctx, cancel := context.WithTimeout(context.Background(), TestInformerSyncTimeout) - defer cancel() - syncedTypes := factory.WaitForCacheSync(ctx.Done()) - if len(syncedTypes) != 2 { - t.Fatalf("informers are not synced") - } - for ty, ok := range syncedTypes { - if !ok { - t.Fatalf("failed to sync: %#v", ty) - } - } - - host := volumetest.NewFakeKubeletVolumeHostWithCSINodeName(t, - tmpDir, - fakeClient, - ProbeVolumePlugins(), - "fakeNode", - csiDriverLister, - volumeAttachmentLister, - ) - plugMgr := host.GetPluginMgr() - - plug, err := plugMgr.FindPluginByName(CSIPluginName) - if err != nil { - t.Fatalf("can't find plugin %v", CSIPluginName) - } - - csiPlug, ok := plug.(*csiPlugin) - if !ok { - t.Fatalf("cannot assert plugin to be type csiPlugin") - } - - return csiPlug, tmpDir, fakeClient -} - func getCsiAttacherFromVolumeAttacher(attacher volume.Attacher) *csiAttacher { csiAttacher := attacher.(*csiAttacher) csiAttacher.watchTimeout = testWatchTimeout