diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 62123c8ef55..b98e093dad3 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -85,10 +85,8 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", - "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/util/testing:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 18c6c0a7899..fbeebdb40f7 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -46,9 +46,9 @@ const ( ) type csiAttacher struct { - plugin *csiPlugin - k8s kubernetes.Interface - waitSleepTime time.Duration + plugin *csiPlugin + k8s kubernetes.Interface + watchTimeout time.Duration csiClient csiClient } @@ -121,7 +121,7 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle)) } - if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, csiTimeout); err != nil { + if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, c.watchTimeout); err != nil { return "", err } @@ -257,7 +257,7 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo } csi := c.csiClient - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + ctx, cancel := context.WithTimeout(context.Background(), c.watchTimeout) defer cancel() // Check whether "STAGE_UNSTAGE_VOLUME" is set stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx) @@ -398,7 +398,7 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error { } klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID)) - err := c.waitForVolumeDetachment(volID, attachID, csiTimeout) + err := c.waitForVolumeDetachment(volID, attachID, c.watchTimeout) return err } @@ -515,7 +515,7 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error { } csi := c.csiClient - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + ctx, cancel := context.WithTimeout(context.Background(), c.watchTimeout) defer cancel() // Check whether "STAGE_UNSTAGE_VOLUME" is set stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx) diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 5e45f87c2a0..35c25223bf7 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -34,24 +34,20 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/informers" - storageinformer "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" fakeclient "k8s.io/client-go/kubernetes/fake" - storagelister "k8s.io/client-go/listers/storage/v1" core "k8s.io/client-go/testing" - utiltesting "k8s.io/client-go/util/testing" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" fakecsi "k8s.io/kubernetes/pkg/volume/csi/fake" - volumetest "k8s.io/kubernetes/pkg/volume/testing" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) +const testWatchTimeout = 10 * time.Second + var ( bFalse = false bTrue = true @@ -201,18 +197,24 @@ func TestAttacherAttach(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("test case: %s", tc.name) - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) + fakeWatcher := watch.NewRaceFreeFake() + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + attacher, err := plug.NewAttacher() if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) - // FIXME: We need to ensure this goroutine exits in the test. + var wg sync.WaitGroup + wg.Add(1) go func(spec *volume.Spec, nodename string, fail bool) { + defer wg.Done() attachID, err := csiAttacher.Attach(spec, types.NodeName(nodename)) if !fail && err != nil { t.Errorf("expecting no failure, but got err: %v", err) @@ -237,6 +239,7 @@ func TestAttacherAttach(t *testing.T) { status.Attached = true } markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, tc.attachID, status) + wg.Wait() }) } } @@ -286,17 +289,23 @@ func TestAttacherAttachWithInline(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("test case: %s", tc.name) - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) + fakeWatcher := watch.NewRaceFreeFake() + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + attacher, err := plug.NewAttacher() if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) - // FIXME: We need to ensure this goroutine exits in the test. + var wg sync.WaitGroup + wg.Add(1) go func(spec *volume.Spec, nodename string, fail bool) { + defer wg.Done() attachID, err := csiAttacher.Attach(spec, types.NodeName(nodename)) if fail != (err != nil) { t.Errorf("expecting no failure, but got err: %v", err) @@ -316,6 +325,7 @@ func TestAttacherAttachWithInline(t *testing.T) { status.Attached = true } markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, tc.attachID, status) + wg.Wait() }) } } @@ -355,7 +365,7 @@ func TestAttacherWithCSIDriver(t *testing.T) { getTestCSIDriver("attachable", nil, &bTrue, nil), getTestCSIDriver("nil", nil, nil, nil), ) - plug, _, tmpDir, _ := newTestWatchPlugin(t, fakeClient, true) + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) attachmentWatchCreated := make(chan core.Action) @@ -378,7 +388,7 @@ func TestAttacherWithCSIDriver(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, test.driver, "test-vol"), false) pluginCanAttach, err := plug.CanAttach(spec) @@ -473,7 +483,7 @@ func TestAttacherWaitForVolumeAttachmentWithCSIDriver(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, test.driver, "test-vol"), false) pluginCanAttach, err := plug.CanAttach(spec) @@ -540,14 +550,15 @@ func TestAttacherWaitForAttach(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) if test.makeAttachment != nil { attachment := test.makeAttachment() @@ -622,14 +633,15 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) if test.makeAttachment != nil { attachment := test.makeAttachment() @@ -709,14 +721,19 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { for i, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) + fakeWatcher := watch.NewRaceFreeFake() + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + attacher, err := plug.NewAttacher() if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) + t.Logf("running test: %v", tc.name) pvName := fmt.Sprintf("test-pv-%d", i) volID := fmt.Sprintf("test-vol-%d", i) @@ -732,9 +749,12 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { trigerWatchEventTime := tc.trigerWatchEventTime finalAttached := tc.finalAttached finalAttachErr := tc.finalAttachErr + var wg sync.WaitGroup // after timeout, fakeWatcher will be closed by csiAttacher.waitForVolumeAttachment if tc.trigerWatchEventTime > 0 && tc.trigerWatchEventTime < tc.timeout { + wg.Add(1) go func() { + defer wg.Done() time.Sleep(trigerWatchEventTime) attachment := makeTestAttachment(attachID, nodeName, pvName) attachment.Status.Attached = finalAttached @@ -755,6 +775,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { if err == nil && retID != attachID { t.Errorf("attacher.WaitForAttach not returning attachment ID") } + wg.Wait() }) } } @@ -812,7 +833,7 @@ func TestAttacherVolumesAreAttached(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) nodeName := "fakeNode" var specs []*volume.Spec @@ -883,7 +904,7 @@ func TestAttacherVolumesAreAttachedWithInline(t *testing.T) { if err != nil { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) nodeName := "fakeNode" var specs []*volume.Spec @@ -966,17 +987,22 @@ func TestAttacherDetach(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("running test: %v", tc.name) - plug, fakeWatcher, tmpDir, client := newTestWatchPlugin(t, nil, false) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) + + fakeWatcher := watch.NewRaceFreeFake() + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + if tc.reactor != nil { - client.PrependReactor("*", "*", tc.reactor) + fakeClient.PrependReactor("*", "*", tc.reactor) } attacher, err0 := plug.NewAttacher() if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) pv := makeTestPV("test-pv", 10, testDriver, tc.volID) spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) @@ -990,8 +1016,10 @@ func TestAttacherDetach(t *testing.T) { t.Errorf("test case %s failed: %v", tc.name, err) } watchError := tc.watcherError - csiAttacher.waitSleepTime = 100 * time.Millisecond + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() if watchError { errStatus := apierrors.NewInternalError(fmt.Errorf("we got an error")).Status() fakeWatcher.Error(&errStatus) @@ -1016,6 +1044,7 @@ func TestAttacherDetach(t *testing.T) { t.Errorf("expecting attachment not to be nil, but it is") } } + wg.Wait() }) } } @@ -1023,13 +1052,14 @@ func TestAttacherDetach(t *testing.T) { func TestAttacherGetDeviceMountPath(t *testing.T) { // Setup // Create a new attacher - plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) attacher, err0 := plug.NewAttacher() if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) pluginDir := csiAttacher.plugin.host.GetPluginDir(plug.GetPluginName()) @@ -1188,13 +1218,18 @@ func TestAttacherMountDevice(t *testing.T) { // Setup // Create a new attacher - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) + + fakeWatcher := watch.NewRaceFreeFake() + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + attacher, err0 := plug.NewAttacher() if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet) if tc.deviceMountPath != "" { @@ -1203,6 +1238,7 @@ func TestAttacherMountDevice(t *testing.T) { nodeName := string(csiAttacher.plugin.host.GetNodeName()) attachID := getAttachmentName(tc.volName, testDriver, nodeName) + var wg sync.WaitGroup if tc.createAttachment { // Set up volume attachment @@ -1211,7 +1247,9 @@ func TestAttacherMountDevice(t *testing.T) { if err != nil { t.Fatalf("failed to attach: %v", err) } + wg.Add(1) go func() { + defer wg.Done() fakeWatcher.Delete(attachment) }() } @@ -1269,6 +1307,8 @@ func TestAttacherMountDevice(t *testing.T) { } } } + + wg.Wait() }) } } @@ -1351,13 +1391,18 @@ func TestAttacherMountDeviceWithInline(t *testing.T) { // Setup // Create a new attacher - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) + + fakeWatcher := watch.NewRaceFreeFake() + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + attacher, err0 := plug.NewAttacher() if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet) if tc.deviceMountPath != "" { @@ -1373,7 +1418,12 @@ func TestAttacherMountDeviceWithInline(t *testing.T) { if err != nil { t.Fatalf("failed to attach: %v", err) } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() fakeWatcher.Delete(attachment) }() @@ -1411,6 +1461,8 @@ func TestAttacherMountDeviceWithInline(t *testing.T) { t.Errorf("expected mount path: %s. got: %s", tc.deviceMountPath, vol.Path) } } + + wg.Wait() }) } } @@ -1479,13 +1531,14 @@ func TestAttacherUnmountDevice(t *testing.T) { t.Logf("Running test case: %s", tc.testName) // Setup // Create a new attacher - plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true) + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) attacher, err0 := plug.NewAttacher() if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet) if tc.deviceMountPath != "" { @@ -1565,69 +1618,26 @@ func TestAttacherUnmountDevice(t *testing.T) { } } -// create a plugin mgr to load plugins and setup a fake client -func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset, setupInformer bool) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) { - tmpDir, err := utiltesting.MkTmpdir("csi-test") - if err != nil { - t.Fatalf("can't create temp dir: %v", err) - } - - if fakeClient == nil { - fakeClient = fakeclient.NewSimpleClientset() - } - fakeClient.Tracker().Add(&v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "fakeNode", - }, - Spec: v1.NodeSpec{}, - }) - fakeWatcher := watch.NewRaceFreeFake() - if !setupInformer { - // TODO: In the fakeClient, if default watchReactor is overwritten, the volumeAttachmentInformer - // and the csiAttacher.Attach both endup reading from same channel causing hang in Attach(). - // So, until this is fixed, we don't overwrite default reactor while setting up volumeAttachment informer. - fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) - } - - // Start informer for CSIDrivers. - factory := informers.NewSharedInformerFactory(fakeClient, CsiResyncPeriod) - csiDriverInformer := factory.Storage().V1().CSIDrivers() - csiDriverLister := csiDriverInformer.Lister() - var volumeAttachmentInformer storageinformer.VolumeAttachmentInformer - var volumeAttachmentLister storagelister.VolumeAttachmentLister - if setupInformer { - volumeAttachmentInformer = factory.Storage().V1().VolumeAttachments() - volumeAttachmentLister = volumeAttachmentInformer.Lister() - } - - factory.Start(wait.NeverStop) - ctx, cancel := context.WithTimeout(context.Background(), TestInformerSyncTimeout) - defer cancel() - for ty, ok := range factory.WaitForCacheSync(ctx.Done()) { - if !ok { - t.Fatalf("failed to sync: %#v", ty) - } - } - - host := volumetest.NewFakeKubeletVolumeHostWithCSINodeName(t, - tmpDir, - fakeClient, - ProbeVolumePlugins(), - "fakeNode", - csiDriverLister, - volumeAttachmentLister, - ) - plugMgr := host.GetPluginMgr() - - plug, err := plugMgr.FindPluginByName(CSIPluginName) - if err != nil { - t.Fatalf("can't find plugin %v", CSIPluginName) - } - - csiPlug, ok := plug.(*csiPlugin) - if !ok { - t.Fatalf("cannot assert plugin to be type csiPlugin") - } - - return csiPlug, fakeWatcher, tmpDir, fakeClient +func getCsiAttacherFromVolumeAttacher(attacher volume.Attacher) *csiAttacher { + csiAttacher := attacher.(*csiAttacher) + csiAttacher.watchTimeout = testWatchTimeout + return csiAttacher +} + +func getCsiAttacherFromVolumeDetacher(detacher volume.Detacher) *csiAttacher { + csiAttacher := detacher.(*csiAttacher) + csiAttacher.watchTimeout = testWatchTimeout + return csiAttacher +} + +func getCsiAttacherFromDeviceMounter(deviceMounter volume.DeviceMounter) *csiAttacher { + csiAttacher := deviceMounter.(*csiAttacher) + csiAttacher.watchTimeout = testWatchTimeout + return csiAttacher +} + +func getCsiAttacherFromDeviceUnmounter(deviceUnmounter volume.DeviceUnmounter) *csiAttacher { + csiAttacher := deviceUnmounter.(*csiAttacher) + csiAttacher.watchTimeout = testWatchTimeout + return csiAttacher } diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 77826866dbf..5291a729e7d 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -944,9 +944,9 @@ func (p *csiPlugin) newAttacherDetacher() (*csiAttacher, error) { } return &csiAttacher{ - plugin: p, - k8s: k8s, - waitSleepTime: 1 * time.Second, + plugin: p, + k8s: k8s, + watchTimeout: csiTimeout, }, nil } diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index f8426f46c0d..140ca4834ec 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "testing" + "time" api "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" @@ -64,7 +65,17 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri csiDriverLister := csiDriverInformer.Lister() volumeAttachmentInformer := factory.Storage().V1().VolumeAttachments() volumeAttachmentLister := volumeAttachmentInformer.Lister() - go factory.Start(wait.NeverStop) + + factory.Start(wait.NeverStop) + syncedTypes := factory.WaitForCacheSync(wait.NeverStop) + if len(syncedTypes) != 2 { + t.Fatalf("informers are not synced") + } + for ty, ok := range syncedTypes { + if !ok { + t.Fatalf("failed to sync: %#v", ty) + } + } host := volumetest.NewFakeKubeletVolumeHostWithCSINodeName(t, tmpDir, @@ -86,14 +97,6 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri t.Fatalf("cannot assert plugin to be type csiPlugin") } - // Wait until the informer in CSI volume plugin has all CSIDrivers. - wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) { - return csiDriverInformer.Informer().HasSynced(), nil - }) - - wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) { - return volumeAttachmentInformer.Informer().HasSynced(), nil - }) return csiPlug, tmpDir } @@ -881,13 +884,16 @@ func TestPluginNewAttacher(t *testing.T) { t.Fatalf("failed to create new attacher: %v", err) } - csiAttacher := attacher.(*csiAttacher) + csiAttacher := getCsiAttacherFromVolumeAttacher(attacher) if csiAttacher.plugin == nil { t.Error("plugin not set for attacher") } if csiAttacher.k8s == nil { t.Error("Kubernetes client not set for attacher") } + if csiAttacher.watchTimeout == time.Duration(0) { + t.Error("watch timeout not set for attacher") + } } func TestPluginNewDetacher(t *testing.T) { @@ -901,13 +907,16 @@ func TestPluginNewDetacher(t *testing.T) { t.Fatalf("failed to create new detacher: %v", err) } - csiDetacher := detacher.(*csiAttacher) + csiDetacher := getCsiAttacherFromVolumeDetacher(detacher) if csiDetacher.plugin == nil { t.Error("plugin not set for detacher") } if csiDetacher.k8s == nil { t.Error("Kubernetes client not set for detacher") } + if csiDetacher.watchTimeout == time.Duration(0) { + t.Error("watch timeout not set for detacher") + } } func TestPluginCanAttach(t *testing.T) { diff --git a/pkg/volume/csi/csi_test.go b/pkg/volume/csi/csi_test.go index 116f972655f..a55c35ad44e 100644 --- a/pkg/volume/csi/csi_test.go +++ b/pkg/volume/csi/csi_test.go @@ -387,7 +387,7 @@ func TestCSI_VolumeAll(t *testing.T) { } if devMounter != nil { - csiDevMounter := devMounter.(*csiAttacher) + csiDevMounter := getCsiAttacherFromDeviceMounter(devMounter) csiDevMounter.csiClient = csiClient devMountPath, err := csiDevMounter.GetDeviceMountPath(volSpec) if err != nil { @@ -550,8 +550,8 @@ func TestCSI_VolumeAll(t *testing.T) { } if devMounter != nil && devUnmounter != nil { - csiDevMounter := devMounter.(*csiAttacher) - csiDevUnmounter := devUnmounter.(*csiAttacher) + csiDevMounter := getCsiAttacherFromDeviceMounter(devMounter) + csiDevUnmounter := getCsiAttacherFromDeviceUnmounter(devUnmounter) csiDevUnmounter.csiClient = csiClient devMountPath, err := csiDevMounter.GetDeviceMountPath(volSpec) @@ -592,7 +592,7 @@ func TestCSI_VolumeAll(t *testing.T) { if err != nil { t.Fatal("csiTest.VolumeAll volumePlugin.GetVolumeName failed:", err) } - csiDetacher := volDetacher.(*csiAttacher) + csiDetacher := getCsiAttacherFromVolumeDetacher(volDetacher) csiDetacher.csiClient = csiClient if err := csiDetacher.Detach(volName, host.GetNodeName()); err != nil { t.Fatal("csiTest.VolumeAll detacher.Detach failed:", err) diff --git a/pkg/volume/csi/testing/testing.go b/pkg/volume/csi/testing/testing.go index 2c0a26169f3..89a740d8a74 100644 --- a/pkg/volume/csi/testing/testing.go +++ b/pkg/volume/csi/testing/testing.go @@ -52,7 +52,17 @@ func NewTestPlugin(t *testing.T, client *fakeclient.Clientset) (*volume.VolumePl factory := informers.NewSharedInformerFactory(client, csi.CsiResyncPeriod) csiDriverInformer := factory.Storage().V1().CSIDrivers() csiDriverLister := csiDriverInformer.Lister() - go factory.Start(wait.NeverStop) + + factory.Start(wait.NeverStop) + syncedTypes := factory.WaitForCacheSync(wait.NeverStop) + if len(syncedTypes) != 1 { + t.Fatalf("informers are not synced") + } + for ty, ok := range syncedTypes { + if !ok { + t.Fatalf("failed to sync: %#v", ty) + } + } host := volumetest.NewFakeVolumeHostWithCSINodeName(t, tmpDir, @@ -69,10 +79,5 @@ func NewTestPlugin(t *testing.T, client *fakeclient.Clientset) (*volume.VolumePl t.Fatalf("can't find plugin %v", csi.CSIPluginName) } - // Wait until the informer in CSI volume plugin has all CSIDrivers. - wait.PollImmediate(csi.TestInformerSyncPeriod, csi.TestInformerSyncTimeout, func() (bool, error) { - return csiDriverInformer.Informer().HasSynced(), nil - }) - return plugMgr, &plug, tmpDir }