From f36fec997ad7abd619aecc0d4e95ab928e3acb09 Mon Sep 17 00:00:00 2001 From: Chris Henzie Date: Tue, 2 Feb 2021 10:30:34 -0800 Subject: [PATCH] Poll for VolumeAttachments in CSI attacher The CSI attacher that runs inside of the AttachDetachController has access to a VolumeAttachment lister. By polling this lister for the status of VolumeAttachments, we can save threads on the API server by not using watches. --- pkg/volume/csi/BUILD | 1 + pkg/volume/csi/csi_attacher.go | 196 +++++++++++++++++++--------- pkg/volume/csi/csi_attacher_test.go | 93 ++----------- pkg/volume/csi/csi_test.go | 43 +++--- 4 files changed, 171 insertions(+), 162 deletions(-) diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index b98e093dad3..21e2880ecb3 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -29,6 +29,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 31841132b41..2cb6ec7bc8e 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -26,6 +26,7 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" @@ -34,6 +35,7 @@ import ( meta "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/volume" @@ -76,52 +78,56 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string node := string(nodeName) attachID := getAttachmentName(pvSrc.VolumeHandle, pvSrc.Driver, node) - var vaSrc storage.VolumeAttachmentSource - if spec.InlineVolumeSpecForCSIMigration { - // inline PV scenario - use PV spec to populate VA source. - // The volume spec will be populated by CSI translation API - // for inline volumes. This allows fields required by the CSI - // attacher such as AccessMode and MountOptions (in addition to - // fields in the CSI persistent volume source) to be populated - // as part of CSI translation for inline volumes. - vaSrc = storage.VolumeAttachmentSource{ - InlineVolumeSpec: &spec.PersistentVolume.Spec, + attachment, err := c.plugin.volumeAttachmentLister.Get(attachID) + if err != nil && !apierrors.IsNotFound(err) { + return "", errors.New(log("failed to get volume attachment from lister: %v", err)) + } + + if attachment == nil { + var vaSrc storage.VolumeAttachmentSource + if spec.InlineVolumeSpecForCSIMigration { + // inline PV scenario - use PV spec to populate VA source. + // The volume spec will be populated by CSI translation API + // for inline volumes. This allows fields required by the CSI + // attacher such as AccessMode and MountOptions (in addition to + // fields in the CSI persistent volume source) to be populated + // as part of CSI translation for inline volumes. + vaSrc = storage.VolumeAttachmentSource{ + InlineVolumeSpec: &spec.PersistentVolume.Spec, + } + } else { + // regular PV scenario - use PV name to populate VA source + pvName := spec.PersistentVolume.GetName() + vaSrc = storage.VolumeAttachmentSource{ + PersistentVolumeName: &pvName, + } } - } else { - // regular PV scenario - use PV name to populate VA source - pvName := spec.PersistentVolume.GetName() - vaSrc = storage.VolumeAttachmentSource{ - PersistentVolumeName: &pvName, + + attachment := &storage.VolumeAttachment{ + ObjectMeta: meta.ObjectMeta{ + Name: attachID, + }, + Spec: storage.VolumeAttachmentSpec{ + NodeName: node, + Attacher: pvSrc.Driver, + Source: vaSrc, + }, + } + + _, err = c.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return "", errors.New(log("attacher.Attach failed: %v", err)) + } + klog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, pvSrc.VolumeHandle)) + } else { + klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle)) } } - attachment := &storage.VolumeAttachment{ - ObjectMeta: meta.ObjectMeta{ - Name: attachID, - }, - Spec: storage.VolumeAttachmentSpec{ - NodeName: node, - Attacher: pvSrc.Driver, - Source: vaSrc, - }, - } - - _, err = c.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) - alreadyExist := false - if err != nil { - if !apierrors.IsAlreadyExists(err) { - return "", errors.New(log("attacher.Attach failed: %v", err)) - } - alreadyExist = true - } - - if alreadyExist { - klog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, pvSrc.VolumeHandle)) - } else { - klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle)) - } - - if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, c.watchTimeout); err != nil { + // Attach and detach functionality is exclusive to the CSI plugin that runs in the AttachDetachController, + // and has access to a VolumeAttachment lister that can be polled for the current status. + if err := c.waitForVolumeAttachmentWithLister(pvSrc.VolumeHandle, attachID, c.watchTimeout); err != nil { return "", err } @@ -166,6 +172,32 @@ func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID str return attach.Name, nil } +func (c *csiAttacher) waitForVolumeAttachmentWithLister(volumeHandle, attachID string, timeout time.Duration) error { + klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID)) + + verifyStatus := func() (bool, error) { + volumeAttachment, err := c.plugin.volumeAttachmentLister.Get(attachID) + if err != nil { + // Ignore "not found" errors in case the VolumeAttachment was just created and hasn't yet made it into the lister. + if !apierrors.IsNotFound(err) { + klog.Error(log("unexpected error waiting for volume attachment, %v", err)) + return false, err + } + + // The VolumeAttachment is not available yet and we will have to try again. + return false, nil + } + + successful, err := verifyAttachmentStatus(volumeAttachment, volumeHandle) + if err != nil { + return false, err + } + return successful, nil + } + + return c.waitForVolumeAttachDetachStatusWithLister(volumeHandle, attachID, timeout, verifyStatus, "Attach") +} + func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { klog.V(4).Info(log("probing attachment status for %d volume(s) ", len(specs))) @@ -399,36 +431,70 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error { } klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID)) - err := c.waitForVolumeDetachment(volID, attachID, c.watchTimeout) - return err + + // Attach and detach functionality is exclusive to the CSI plugin that runs in the AttachDetachController, + // and has access to a VolumeAttachment lister that can be polled for the current status. + return c.waitForVolumeDetachmentWithLister(volID, attachID, c.watchTimeout) } -func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string, timeout time.Duration) error { - klog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID)) - - timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable - defer timer.Stop() - - return c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout) -} - -func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID string, timer *time.Timer, - timeout time.Duration) error { +func (c *csiAttacher) waitForVolumeDetachmentWithLister(volumeHandle, attachID string, timeout time.Duration) error { klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID)) - attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{}) - if err != nil { - if apierrors.IsNotFound(err) { - //object deleted or never existed, done + + verifyStatus := func() (bool, error) { + volumeAttachment, err := c.plugin.volumeAttachmentLister.Get(attachID) + if err != nil { + if !apierrors.IsNotFound(err) { + return false, errors.New(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err)) + } + + // Detachment successful. klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle)) - return nil + return true, nil } - return errors.New(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err)) + + // Detachment is only "successful" once the VolumeAttachment is deleted, however we perform + // this check to make sure the object does not contain any detach errors. + successful, err := verifyDetachmentStatus(volumeAttachment, volumeHandle) + if err != nil { + return false, err + } + return successful, nil } - err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyDetachmentStatus) - if err != nil { - return err + + return c.waitForVolumeAttachDetachStatusWithLister(volumeHandle, attachID, timeout, verifyStatus, "Detach") +} + +func (c *csiAttacher) waitForVolumeAttachDetachStatusWithLister(volumeHandle, attachID string, timeout time.Duration, verifyStatus func() (bool, error), operation string) error { + var ( + initBackoff = 500 * time.Millisecond + // This is approximately the duration between consecutive ticks after two minutes (CSI timeout). + maxBackoff = 7 * time.Second + resetDuration = time.Minute + backoffFactor = 1.05 + jitter = 0.1 + clock = &clock.RealClock{} + ) + backoffMgr := wait.NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration, backoffFactor, jitter, clock) + defer backoffMgr.Backoff().Stop() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + for { + select { + case <-backoffMgr.Backoff().C(): + successful, err := verifyStatus() + if err != nil { + return err + } + if successful { + return nil + } + case <-ctx.Done(): + klog.Error(log("%s timeout after %v [volume=%v; attachment.ID=%v]", operation, timeout, volumeHandle, attachID)) + return fmt.Errorf("%s timeout for volume %v", operation, volumeHandle) + } } - return err } func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string, diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index bb096dffbda..adf1bb2e31a 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -199,12 +199,9 @@ func TestAttacherAttach(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Logf("test case: %s", tc.name) fakeClient := fakeclient.NewSimpleClientset() - plug, tmpDir := newTestPlugin(t, fakeClient) + plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, fakeClient) defer os.RemoveAll(tmpDir) - fakeWatcher := watch.NewRaceFreeFake() - fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) - attacher, err := plug.NewAttacher() if err != nil { t.Fatalf("failed to create new attacher: %v", err) @@ -234,12 +231,10 @@ func TestAttacherAttach(t *testing.T) { status.AttachError = &storage.VolumeError{ Message: "attacher error", } - errStatus := apierrors.NewInternalError(fmt.Errorf("we got an error")).Status() - fakeWatcher.Error(&errStatus) } else { status.Attached = true } - markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, tc.attachID, status) + markVolumeAttached(t, csiAttacher.k8s, nil, tc.attachID, status) wg.Wait() }) } @@ -291,12 +286,9 @@ func TestAttacherAttachWithInline(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Logf("test case: %s", tc.name) fakeClient := fakeclient.NewSimpleClientset() - plug, tmpDir := newTestPlugin(t, fakeClient) + plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, fakeClient) defer os.RemoveAll(tmpDir) - fakeWatcher := watch.NewRaceFreeFake() - fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) - attacher, err := plug.NewAttacher() if err != nil { t.Fatalf("failed to create new attacher: %v", err) @@ -325,7 +317,7 @@ func TestAttacherAttachWithInline(t *testing.T) { } else { status.Attached = true } - markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, tc.attachID, status) + markVolumeAttached(t, csiAttacher.k8s, nil, tc.attachID, status) wg.Wait() }) } @@ -366,25 +358,9 @@ func TestAttacherWithCSIDriver(t *testing.T) { getTestCSIDriver("attachable", nil, &bTrue, nil), getTestCSIDriver("nil", nil, nil, nil), ) - plug, tmpDir := newTestPlugin(t, fakeClient) + plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, fakeClient) defer os.RemoveAll(tmpDir) - attachmentWatchCreated := make(chan core.Action) - // Make sure this is the first reactor - fakeClient.Fake.PrependWatchReactor("volumeattachments", func(action core.Action) (bool, watch.Interface, error) { - select { - case <-attachmentWatchCreated: - // already closed - default: - // The attacher is already watching the attachment, notify the test goroutine to - // update the status of attachment. - // TODO: In theory this still has a race condition, because the actual watch is created by - // the next reactor in the chain and we unblock the test goroutine before returning here. - close(attachmentWatchCreated) - } - return false, nil, nil - }) - attacher, err := plug.NewAttacher() if err != nil { t.Fatalf("failed to create new attacher: %v", err) @@ -423,9 +399,6 @@ func TestAttacherWithCSIDriver(t *testing.T) { status := storage.VolumeAttachmentStatus{ Attached: true, } - // We want to ensure the watcher, which is created in csiAttacher, - // has been started before updating the status of attachment. - <-attachmentWatchCreated markVolumeAttached(t, csiAttacher.k8s, nil, expectedAttachID, status) } wg.Wait() @@ -827,7 +800,7 @@ func TestAttacherVolumesAreAttached(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - plug, tmpDir := newTestPlugin(t, nil) + plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, nil) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -947,12 +920,11 @@ func TestAttacherVolumesAreAttachedWithInline(t *testing.T) { func TestAttacherDetach(t *testing.T) { nodeName := "fakeNode" testCases := []struct { - name string - volID string - attachID string - shouldFail bool - watcherError bool - reactor func(action core.Action) (handled bool, ret runtime.Object, err error) + name string + volID string + attachID string + shouldFail bool + reactor func(action core.Action) (handled bool, ret runtime.Object, err error) }{ {name: "normal test", volID: "vol-001", attachID: getAttachmentName("vol-001", testDriver, nodeName)}, {name: "normal test 2", volID: "vol-002", attachID: getAttachmentName("vol-002", testDriver, nodeName)}, @@ -970,31 +942,15 @@ func TestAttacherDetach(t *testing.T) { return false, nil, nil }, }, - { - name: "API watch error happen", - volID: "vol-005", - attachID: getAttachmentName("vol-005", testDriver, nodeName), - shouldFail: true, - watcherError: true, - reactor: func(action core.Action) (handled bool, ret runtime.Object, err error) { - if action.Matches("get", "volumeattachments") { - return true, makeTestAttachment(getAttachmentName("vol-005", testDriver, nodeName), nodeName, "vol-005"), nil - } - return false, nil, nil - }, - }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("running test: %v", tc.name) fakeClient := fakeclient.NewSimpleClientset() - plug, tmpDir := newTestPlugin(t, fakeClient) + plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, fakeClient) defer os.RemoveAll(tmpDir) - fakeWatcher := watch.NewRaceFreeFake() - fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) - if tc.reactor != nil { fakeClient.PrependReactor("*", "*", tc.reactor) } @@ -1016,18 +972,7 @@ func TestAttacherDetach(t *testing.T) { if err != nil { t.Errorf("test case %s failed: %v", tc.name, err) } - watchError := tc.watcherError - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if watchError { - errStatus := apierrors.NewInternalError(fmt.Errorf("we got an error")).Status() - fakeWatcher.Error(&errStatus) - return - } - fakeWatcher.Delete(attachment) - }() + err = csiAttacher.Detach(volumeName, types.NodeName(nodeName)) if tc.shouldFail && err == nil { t.Fatal("expecting failure, but err = nil") @@ -1045,7 +990,6 @@ func TestAttacherDetach(t *testing.T) { t.Errorf("expecting attachment not to be nil, but it is") } } - wg.Wait() }) } } @@ -1239,9 +1183,6 @@ func TestAttacherMountDevice(t *testing.T) { plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) - fakeWatcher := watch.NewRaceFreeFake() - fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) - attacher, err0 := plug.NewAttacher() if err0 != nil { t.Fatalf("failed to create new attacher: %v", err0) @@ -1255,7 +1196,6 @@ func TestAttacherMountDevice(t *testing.T) { nodeName := string(csiAttacher.plugin.host.GetNodeName()) attachID := getAttachmentName(tc.volName, testDriver, nodeName) - var wg sync.WaitGroup if tc.createAttachment { // Set up volume attachment @@ -1264,11 +1204,6 @@ func TestAttacherMountDevice(t *testing.T) { if err != nil { t.Fatalf("failed to attach: %v", err) } - wg.Add(1) - go func() { - defer wg.Done() - fakeWatcher.Delete(attachment) - }() } parent := filepath.Dir(tc.deviceMountPath) @@ -1359,8 +1294,6 @@ func TestAttacherMountDevice(t *testing.T) { } } } - - wg.Wait() }) } } diff --git a/pkg/volume/csi/csi_test.go b/pkg/volume/csi/csi_test.go index a55c35ad44e..e6a004ed83d 100644 --- a/pkg/volume/csi/csi_test.go +++ b/pkg/volume/csi/csi_test.go @@ -33,7 +33,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" fakeclient "k8s.io/client-go/kubernetes/fake" @@ -274,7 +273,6 @@ func TestCSI_VolumeAll(t *testing.T) { }) client := fakeclient.NewSimpleClientset(objs...) - fakeWatcher := watch.NewRaceFreeFake() factory := informers.NewSharedInformerFactory(client, time.Hour /* disable resync */) csiDriverInformer := factory.Storage().V1().CSIDrivers() @@ -282,10 +280,11 @@ func TestCSI_VolumeAll(t *testing.T) { if driverInfo != nil { csiDriverInformer.Informer().GetStore().Add(driverInfo) } + factory.Start(wait.NeverStop) factory.WaitForCacheSync(wait.NeverStop) - host := volumetest.NewFakeKubeletVolumeHostWithCSINodeName(t, + attachDetachVolumeHost := volumetest.NewFakeAttachDetachVolumeHostWithCSINodeName(t, tmpDir, client, ProbeVolumePlugins(), @@ -293,18 +292,18 @@ func TestCSI_VolumeAll(t *testing.T) { csiDriverInformer.Lister(), volumeAttachmentInformer.Lister(), ) - plugMgr := host.GetPluginMgr() + attachDetachPlugMgr := attachDetachVolumeHost.GetPluginMgr() csiClient := setupClient(t, true) volSpec := test.specFunc(test.specName, test.driver, test.volName) pod := test.podFunc() - attachName := getAttachmentName(test.volName, test.driver, string(host.GetNodeName())) + attachName := getAttachmentName(test.volName, test.driver, string(attachDetachVolumeHost.GetNodeName())) t.Log("csiTest.VolumeAll starting...") // *************** Attach/Mount volume resources ****************// // attach volume t.Log("csiTest.VolumeAll Attaching volume...") - attachPlug, err := plugMgr.FindAttachablePluginBySpec(volSpec) + attachPlug, err := attachDetachPlugMgr.FindAttachablePluginBySpec(volSpec) if err != nil { if !test.shouldFail { t.Fatalf("csiTest.VolumeAll PluginManager.FindAttachablePluginBySpec failed: %v", err) @@ -333,10 +332,8 @@ func TestCSI_VolumeAll(t *testing.T) { } // creates VolumeAttachment and blocks until it is marked attached (done by external attacher) - attachDone := make(chan struct{}) go func() { - defer close(attachDone) - attachID, err := volAttacher.Attach(volSpec, host.GetNodeName()) + attachID, err := volAttacher.Attach(volSpec, attachDetachVolumeHost.GetNodeName()) if err != nil { t.Errorf("csiTest.VolumeAll attacher.Attach failed: %s", err) return @@ -345,8 +342,7 @@ func TestCSI_VolumeAll(t *testing.T) { }() // Simulates external-attacher and marks VolumeAttachment.Status.Attached = true - markVolumeAttached(t, host.GetKubeClient(), fakeWatcher, attachName, storage.VolumeAttachmentStatus{Attached: true}) - <-attachDone + markVolumeAttached(t, attachDetachVolumeHost.GetKubeClient(), nil, attachName, storage.VolumeAttachmentStatus{Attached: true}) // Observe attach on this node. devicePath, err = volAttacher.WaitForAttach(volSpec, "", pod, 500*time.Millisecond) @@ -364,9 +360,22 @@ func TestCSI_VolumeAll(t *testing.T) { t.Log("csiTest.VolumeAll volume attacher not found, skipping attachment") } + // The reason for separate volume hosts here is because the attach/detach behavior is exclusive to the + // CSI plugin running in the AttachDetachController. Similarly, the mount/unmount behavior is exclusive + // to the CSI plugin running in the Kubelet. + kubeletVolumeHost := volumetest.NewFakeKubeletVolumeHostWithCSINodeName(t, + tmpDir, + client, + ProbeVolumePlugins(), + "fakeNode", + csiDriverInformer.Lister(), + volumeAttachmentInformer.Lister(), + ) + kubeletPlugMgr := kubeletVolumeHost.GetPluginMgr() + // Mount Device t.Log("csiTest.VolumeAll Mouting device...") - devicePlug, err := plugMgr.FindDeviceMountablePluginBySpec(volSpec) + devicePlug, err := kubeletPlugMgr.FindDeviceMountablePluginBySpec(volSpec) if err != nil { t.Fatalf("csiTest.VolumeAll PluginManager.FindDeviceMountablePluginBySpec failed: %v", err) } @@ -403,7 +412,7 @@ func TestCSI_VolumeAll(t *testing.T) { // mount volume t.Log("csiTest.VolumeAll Mouting volume...") - volPlug, err := plugMgr.FindPluginBySpec(volSpec) + volPlug, err := kubeletPlugMgr.FindPluginBySpec(volSpec) if err != nil || volPlug == nil { t.Fatalf("csiTest.VolumeAll PluginMgr.FindPluginBySpec failed: %v", err) } @@ -499,7 +508,7 @@ func TestCSI_VolumeAll(t *testing.T) { t.Log("csiTest.VolumeAll Tearing down...") // unmount volume t.Log("csiTest.VolumeAll Unmouting volume...") - volPlug, err = plugMgr.FindPluginBySpec(volSpec) + volPlug, err = kubeletPlugMgr.FindPluginBySpec(volSpec) if err != nil || volPlug == nil { t.Fatalf("csiTest.VolumeAll PluginMgr.FindPluginBySpec failed: %v", err) } @@ -525,7 +534,7 @@ func TestCSI_VolumeAll(t *testing.T) { // unmount device t.Log("csiTest.VolumeAll Unmouting device...") - devicePlug, err = plugMgr.FindDeviceMountablePluginBySpec(volSpec) + devicePlug, err = kubeletPlugMgr.FindDeviceMountablePluginBySpec(volSpec) if err != nil { t.Fatalf("csiTest.VolumeAll failed to create mountable device plugin: %s", err) } @@ -569,7 +578,7 @@ func TestCSI_VolumeAll(t *testing.T) { // detach volume t.Log("csiTest.VolumeAll Detaching volume...") - attachPlug, err = plugMgr.FindAttachablePluginBySpec(volSpec) + attachPlug, err = attachDetachPlugMgr.FindAttachablePluginBySpec(volSpec) if err != nil { t.Fatalf("csiTest.VolumeAll PluginManager.FindAttachablePluginBySpec failed: %v", err) } @@ -594,7 +603,7 @@ func TestCSI_VolumeAll(t *testing.T) { } csiDetacher := getCsiAttacherFromVolumeDetacher(volDetacher) csiDetacher.csiClient = csiClient - if err := csiDetacher.Detach(volName, host.GetNodeName()); err != nil { + if err := csiDetacher.Detach(volName, attachDetachVolumeHost.GetNodeName()); err != nil { t.Fatal("csiTest.VolumeAll detacher.Detach failed:", err) } t.Log("csiTest.VolumeAll detacher.Detach succeeded for volume", volName)