diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 3b0b11b0aa9..79a1f2b3a2f 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -28,7 +28,7 @@ import ( "k8s.io/klog" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -51,6 +51,8 @@ type csiAttacher struct { csiClient csiClient } +type verifyAttachDetachStatus func(attach *storage.VolumeAttachment, volumeHandle string) (bool, error) + // volume.Attacher methods var _ volume.Attacher = &csiAttacher{} @@ -148,79 +150,18 @@ func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, tim } func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) { + klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID)) attach, err := c.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{}) if err != nil { klog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err)) return "", fmt.Errorf("volume %v has GET error for volume attachment %v: %v", volumeHandle, attachID, err) } - successful, err := verifyAttachmentStatus(attach, volumeHandle) + err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyAttachmentStatus) if err != nil { return "", err } - if successful { - return attachID, nil - } - - watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(meta.SingleObject(meta.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion})) - if err != nil { - return "", fmt.Errorf("watch error:%v for volume %v", err, volumeHandle) - } - - ch := watcher.ResultChan() - defer watcher.Stop() - - for { - select { - case event, ok := <-ch: - if !ok { - klog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID) - return "", errors.New("volume attachment watch channel had been closed") - } - - switch event.Type { - case watch.Added, watch.Modified: - attach, _ := event.Object.(*storage.VolumeAttachment) - successful, err := verifyAttachmentStatus(attach, volumeHandle) - if err != nil { - return "", err - } - if successful { - return attachID, nil - } - case watch.Deleted: - // if deleted, fail fast - klog.Error(log("VolumeAttachment [%s] has been deleted, will not continue to wait for attachment", attachID)) - return "", errors.New("volume attachment has been deleted") - - case watch.Error: - klog.Warningf("waitForVolumeAttachmentInternal received watch error: %v", event) - } - - case <-timer.C: - klog.Error(log("attacher.WaitForAttach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID)) - return "", fmt.Errorf("attachment timeout for volume %v", volumeHandle) - } - } -} - -func verifyAttachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) { - // if being deleted, fail fast - if attachment.GetDeletionTimestamp() != nil { - klog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachment.Name)) - return false, errors.New("volume attachment is being deleted") - } - // attachment OK - if attachment.Status.Attached { - return true, nil - } - // driver reports attach error - attachErr := attachment.Status.AttachError - if attachErr != nil { - klog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message)) - return false, errors.New(attachErr.Message) - } - return false, nil + return attach.Name, nil } func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { @@ -318,8 +259,8 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo } if err = saveVolumeData(dataDir, volDataFileName, data); err != nil { klog.Error(log("failed to save volume info data: %v", err)) - if cleanerr := os.RemoveAll(dataDir); cleanerr != nil { - klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, cleanerr)) + if cleanErr := os.RemoveAll(dataDir); cleanErr != nil { + klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, cleanErr)) } return err } @@ -445,7 +386,8 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error { } klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID)) - return c.waitForVolumeDetachment(volID, attachID) + err := c.waitForVolumeDetachment(volID, attachID) + return err } func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string) error { @@ -458,7 +400,8 @@ func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string) err return c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout) } -func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) error { +func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID string, timer *time.Timer, + timeout time.Duration) error { klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID)) attach, err := c.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{}) if err != nil { @@ -469,16 +412,26 @@ func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID str } return errors.New(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err)) } - // driver reports attach error - detachErr := attach.Status.DetachError - if detachErr != nil { - klog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message)) - return errors.New(detachErr.Message) + err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyDetachmentStatus) + if err != nil { + return err + } + return err +} + +func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string, + timer *time.Timer, timeout time.Duration, verifyStatus verifyAttachDetachStatus) error { + successful, err := verifyStatus(attach, volumeHandle) + if err != nil { + return err + } + if successful { + return nil } watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(meta.SingleObject(meta.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion})) if err != nil { - return errors.New(log("watch error:%v for volume %v", err, volumeHandle)) + return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle) } ch := watcher.ResultChan() @@ -494,24 +447,30 @@ func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID str switch event.Type { case watch.Added, watch.Modified: attach, _ := event.Object.(*storage.VolumeAttachment) - // driver reports attach error - detachErr := attach.Status.DetachError - if detachErr != nil { - klog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message)) - return errors.New(detachErr.Message) + successful, err := verifyStatus(attach, volumeHandle) + if err != nil { + return err + } + if successful { + return nil } case watch.Deleted: - //object deleted + // set attach nil to get different results + // for detachment, a deleted event means successful detachment, should return success + // for attachment, should return fail + if successful, err := verifyStatus(nil, volumeHandle); !successful { + return err + } klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] has been deleted", attachID, volumeHandle)) return nil case watch.Error: - klog.Warningf("waitForVolumeDetachmentInternal received watch error: %v", event) + klog.Warningf("waitForVolumeAttachDetachInternal received watch error: %v", event) } case <-timer.C: - klog.Error(log("detacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID)) - return fmt.Errorf("detachment timeout for volume %v", volumeHandle) + klog.Error(log("attachdetacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID)) + return fmt.Errorf("attachdetachment timeout for volume %v", volumeHandle) } } } @@ -638,3 +597,42 @@ func getDriverAndVolNameFromDeviceMountPath(k8s kubernetes.Interface, deviceMoun return csiSource.Driver, csiSource.VolumeHandle, nil } + +func verifyAttachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) { + // when we received a deleted event during attachment, fail fast + if attachment == nil { + klog.Error(log("VolumeAttachment [%s] has been deleted, will not continue to wait for attachment", volumeHandle)) + return false, errors.New("volume attachment has been deleted") + } + // if being deleted, fail fast + if attachment.GetDeletionTimestamp() != nil { + klog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachment.Name)) + return false, errors.New("volume attachment is being deleted") + } + // attachment OK + if attachment.Status.Attached { + return true, nil + } + // driver reports attach error + attachErr := attachment.Status.AttachError + if attachErr != nil { + klog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message)) + return false, errors.New(attachErr.Message) + } + return false, nil +} + +func verifyDetachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) { + // when we received a deleted event during detachment + // it means we have successfully detached it. + if attachment == nil { + return true, nil + } + // driver reports detach error + detachErr := attachment.Status.DetachError + if detachErr != nil { + klog.Error(log("detachment for VolumeAttachment for volume [%s] failed: %v", volumeHandle, detachErr.Message)) + return false, errors.New(detachErr.Message) + } + return false, nil +}