diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index f4fc458cd88..b53f8333c26 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -490,14 +490,9 @@ func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID str if err != nil { return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle) } - var watcherClosed bool - ch := watcher.ResultChan() - defer func() { - if !watcherClosed { - watcher.Stop() - } - }() + ch := watcher.ResultChan() + defer watcher.Stop() for { select { case event, ok := <-ch: @@ -521,10 +516,7 @@ func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID str return nil case watch.Error: - watcher.Stop() - watcherClosed = true - // start another cycle - return c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout) + klog.Warningf("waitForVolumeDetachmentInternal received watch error: %v", event) } case <-timer.C: diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 2e3b177643c..c9ef770b74d 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -894,11 +894,12 @@ func TestAttacherDetach(t *testing.T) { nodeName := "test-node" testCases := []struct { - name string - volID string - attachID string - shouldFail bool - reactor func(action core.Action) (handled bool, ret runtime.Object, err error) + name string + volID string + attachID string + shouldFail bool + watcherError bool + reactor func(action core.Action) (handled bool, ret runtime.Object, err error) }{ {name: "normal test", volID: "vol-001", attachID: getAttachmentName("vol-001", testDriver, nodeName)}, {name: "normal test 2", volID: "vol-002", attachID: getAttachmentName("vol-002", testDriver, nodeName)}, @@ -916,6 +917,19 @@ func TestAttacherDetach(t *testing.T) { return false, nil, nil }, }, + { + name: "API watch error happen", + volID: "vol-005", + attachID: getAttachmentName("vol-005", testDriver, nodeName), + shouldFail: true, + watcherError: true, + reactor: func(action core.Action) (handled bool, ret runtime.Object, err error) { + if action.Matches("get", "volumeattachments") { + return true, makeTestAttachment(getAttachmentName("vol-005", testDriver, nodeName), nodeName, "vol-005"), nil + } + return false, nil, nil + }, + }, } for _, tc := range testCases { @@ -944,7 +958,14 @@ func TestAttacherDetach(t *testing.T) { if err != nil { t.Errorf("test case %s failed: %v", tc.name, err) } + watchError := tc.watcherError + csiAttacher.waitSleepTime = 100 * time.Millisecond go func() { + if watchError { + errStatus := apierrs.NewInternalError(fmt.Errorf("we got an error")).Status() + fakeWatcher.Error(&errStatus) + return + } fakeWatcher.Delete(attachment) }() err = csiAttacher.Detach(volumeName, types.NodeName(nodeName))