From e1228754f60057e67f86f09f88ed190874974aa8 Mon Sep 17 00:00:00 2001 From: carlory Date: Wed, 28 Aug 2024 12:06:54 +0800 Subject: [PATCH] csi volume plugin stop watching the volumeattachment object if the object is not found or volume is not attached when kubelet wait for volume attached --- pkg/volume/csi/csi_attacher.go | 89 ++++------------------ pkg/volume/csi/csi_attacher_test.go | 113 ---------------------------- 2 files changed, 13 insertions(+), 189 deletions(-) diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index b17646320fc..876df72c10d 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -32,7 +32,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" @@ -54,8 +53,6 @@ type csiAttacher struct { csiClient csiClient } -type verifyAttachDetachStatus func(attach *storage.VolumeAttachment, volumeHandle string) (bool, error) - // volume.Attacher methods var _ volume.Attacher = &csiAttacher{} @@ -141,38 +138,34 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string return "", nil } -func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, timeout time.Duration) (string, error) { +// WaitForAttach waits for the attach operation to complete and returns the device path when it is done. +// But in this case, there should be no waiting. The device is found by the CSI driver later, in NodeStage / NodePublish calls. +// so it should just return device metadata, in this case it is VolumeAttachment name. If the target VolumeAttachment does not +// exist or is not attached, the function will return an error. And then the caller (kubelet) should retry it. +// We can get rid of watching it that serves no purpose. More details in https://issues.k8s.io/124398 +func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, _ time.Duration) (string, error) { source, err := getPVSourceFromSpec(spec) if err != nil { return "", errors.New(log("attacher.WaitForAttach failed to extract CSI volume source: %v", err)) } + volumeHandle := source.VolumeHandle attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(c.plugin.host.GetNodeName())) - return c.waitForVolumeAttachment(source.VolumeHandle, attachID, timeout) -} - -func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, timeout time.Duration) (string, error) { - klog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID)) - - timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable - defer timer.Stop() - - return c.waitForVolumeAttachmentInternal(volumeHandle, attachID, timer, timeout) -} - -func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) { - - klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID)) attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, metav1.GetOptions{}) if err != nil { klog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err)) return "", fmt.Errorf("volume %v has GET error for volume attachment %v: %v", volumeHandle, attachID, err) } - err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyAttachmentStatus) + + successful, err := verifyAttachmentStatus(attach, volumeHandle) if err != nil { return "", err } + if !successful { + klog.Error(log("attacher.WaitForAttach failed for volume [%s] attached (will continue to try)", volumeHandle)) + return "", fmt.Errorf("volume %v is not attached for volume attachment %v", volumeHandle, attachID) + } return attach.Name, nil } @@ -532,62 +525,6 @@ func (c *csiAttacher) waitForVolumeAttachDetachStatusWithLister(spec *volume.Spe } } -func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string, - timer *time.Timer, timeout time.Duration, verifyStatus verifyAttachDetachStatus) error { - successful, err := verifyStatus(attach, volumeHandle) - if err != nil { - return err - } - if successful { - return nil - } - - watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion})) - if err != nil { - return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle) - } - - ch := watcher.ResultChan() - defer watcher.Stop() - for { - select { - case event, ok := <-ch: - if !ok { - klog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID) - return errors.New("volume attachment watch channel had been closed") - } - - switch event.Type { - case watch.Added, watch.Modified: - attach, _ := event.Object.(*storage.VolumeAttachment) - successful, err := verifyStatus(attach, volumeHandle) - if err != nil { - return err - } - if successful { - return nil - } - case watch.Deleted: - // set attach nil to get different results - // for detachment, a deleted event means successful detachment, should return success - // for attachment, should return fail - if successful, err := verifyStatus(nil, volumeHandle); !successful { - return err - } - klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] has been deleted", attachID, volumeHandle)) - return nil - - case watch.Error: - klog.Warningf("waitForVolumeAttachDetachInternal received watch error: %v", event) - } - - case <-timer.C: - klog.Error(log("attachdetacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID)) - return fmt.Errorf("attachdetachment timeout for volume %v", volumeHandle) - } - } -} - func (c *csiAttacher) UnmountDevice(deviceMountPath string) error { klog.V(4).Info(log("attacher.UnmountDevice(%s)", deviceMountPath)) diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 5e2fff1d964..605f597e60b 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -649,119 +649,6 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) { } } -func TestAttacherWaitForVolumeAttachment(t *testing.T) { - nodeName := "fakeNode" - testCases := []struct { - name string - initAttached bool - finalAttached bool - trigerWatchEventTime time.Duration - initAttachErr *storage.VolumeError - finalAttachErr *storage.VolumeError - timeout time.Duration - shouldFail bool - watchTimeout time.Duration - }{ - { - name: "attach success at get", - initAttached: true, - timeout: 50 * time.Millisecond, - shouldFail: false, - }, - { - name: "attachment error ant get", - initAttachErr: &storage.VolumeError{Message: "missing volume"}, - timeout: 30 * time.Millisecond, - shouldFail: true, - }, - { - name: "attach success at watch", - initAttached: false, - finalAttached: true, - trigerWatchEventTime: 5 * time.Millisecond, - timeout: 50 * time.Millisecond, - shouldFail: false, - }, - { - name: "attachment error ant watch", - initAttached: false, - finalAttached: false, - finalAttachErr: &storage.VolumeError{Message: "missing volume"}, - trigerWatchEventTime: 5 * time.Millisecond, - timeout: 30 * time.Millisecond, - shouldFail: true, - }, - { - name: "time ran out", - initAttached: false, - finalAttached: true, - trigerWatchEventTime: 100 * time.Millisecond, - timeout: 50 * time.Millisecond, - shouldFail: true, - }, - } - - for i, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - fakeClient := fakeclient.NewSimpleClientset() - plug, tmpDir := newTestPlugin(t, fakeClient) - defer os.RemoveAll(tmpDir) - - fakeWatcher := watch.NewRaceFreeFake() - fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) - - attacher, err := plug.NewAttacher() - if err != nil { - t.Fatalf("failed to create new attacher: %v", err) - } - csiAttacher := getCsiAttacherFromVolumeAttacher(attacher, tc.watchTimeout) - - t.Logf("running test: %v", tc.name) - pvName := fmt.Sprintf("test-pv-%d", i) - volID := fmt.Sprintf("test-vol-%d", i) - attachID := getAttachmentName(volID, testDriver, nodeName) - attachment := makeTestAttachment(attachID, nodeName, pvName) - attachment.Status.Attached = tc.initAttached - attachment.Status.AttachError = tc.initAttachErr - _, err = csiAttacher.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("failed to attach: %v", err) - } - - trigerWatchEventTime := tc.trigerWatchEventTime - finalAttached := tc.finalAttached - finalAttachErr := tc.finalAttachErr - var wg sync.WaitGroup - // after timeout, fakeWatcher will be closed by csiAttacher.waitForVolumeAttachment - if tc.trigerWatchEventTime > 0 && tc.trigerWatchEventTime < tc.timeout { - wg.Add(1) - go func() { - defer wg.Done() - time.Sleep(trigerWatchEventTime) - attachment := makeTestAttachment(attachID, nodeName, pvName) - attachment.Status.Attached = finalAttached - attachment.Status.AttachError = finalAttachErr - fakeWatcher.Modify(attachment) - }() - } - - retID, err := csiAttacher.waitForVolumeAttachment(volID, attachID, tc.timeout) - if tc.shouldFail && err == nil { - t.Error("expecting failure, but err is nil") - } - if tc.initAttachErr != nil && err != nil { - if tc.initAttachErr.Message != err.Error() { - t.Errorf("expecting error [%v], got [%v]", tc.initAttachErr.Message, err.Error()) - } - } - if err == nil && retID != attachID { - t.Errorf("attacher.WaitForAttach not returning attachment ID") - } - wg.Wait() - }) - } -} - func TestAttacherVolumesAreAttached(t *testing.T) { type attachedSpec struct { volName string