diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 283dd09f2e5..3b0b11b0aa9 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -480,14 +480,9 @@ func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID str if err != nil { return errors.New(log("watch error:%v for volume %v", err, volumeHandle)) } - var watcherClosed bool - ch := watcher.ResultChan() - defer func() { - if !watcherClosed { - watcher.Stop() - } - }() + ch := watcher.ResultChan() + defer watcher.Stop() for { select { case event, ok := <-ch: @@ -511,10 +506,7 @@ func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID str return nil case watch.Error: - watcher.Stop() - watcherClosed = true - // start another cycle - return c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout) + klog.Warningf("waitForVolumeDetachmentInternal received watch error: %v", event) } case <-timer.C: diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 6dff65ae663..95ab8e29f1a 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -894,11 +894,12 @@ func TestAttacherDetach(t *testing.T) { nodeName := "test-node" testCases := []struct { - name string - volID string - attachID string - shouldFail bool - reactor func(action core.Action) (handled bool, ret runtime.Object, err error) + name string + volID string + attachID string + shouldFail bool + watcherError bool + reactor func(action core.Action) (handled bool, ret runtime.Object, err error) }{ {name: "normal test", volID: "vol-001", attachID: getAttachmentName("vol-001", testDriver, nodeName)}, {name: "normal test 2", volID: "vol-002", attachID: getAttachmentName("vol-002", testDriver, nodeName)}, @@ -916,6 +917,19 @@ func TestAttacherDetach(t *testing.T) { return false, nil, nil }, }, + { + name: "API watch error happen", + volID: "vol-005", + attachID: getAttachmentName("vol-005", testDriver, nodeName), + shouldFail: true, + watcherError: true, + reactor: func(action core.Action) (handled bool, ret runtime.Object, err error) { + if action.Matches("get", "volumeattachments") { + return true, makeTestAttachment(getAttachmentName("vol-005", testDriver, nodeName), nodeName, "vol-005"), nil + } + return false, nil, nil + }, + }, } for _, tc := range testCases { @@ -944,7 +958,14 @@ func TestAttacherDetach(t *testing.T) { if err != nil { t.Errorf("test case %s failed: %v", tc.name, err) } + watchError := tc.watcherError + csiAttacher.waitSleepTime = 100 * time.Millisecond go func() { + if watchError { + errStatus := apierrs.NewInternalError(fmt.Errorf("we got an error")).Status() + fakeWatcher.Error(&errStatus) + return + } fakeWatcher.Delete(attachment) }() err = csiAttacher.Detach(volumeName, types.NodeName(nodeName))