Merge attach and detach common func

This commit is contained in:
caiweidong 2019-09-11 15:13:47 +08:00
parent 5823cfce76
commit a890bf1175

View File

@ -28,7 +28,7 @@ import (
"k8s.io/klog"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -51,6 +51,8 @@ type csiAttacher struct {
csiClient csiClient
}
type verifyAttachDetachStatus func(attach *storage.VolumeAttachment, volumeHandle string) (bool, error)
// volume.Attacher methods
var _ volume.Attacher = &csiAttacher{}
@ -148,79 +150,18 @@ func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, tim
}
func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) {
klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
klog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
return "", fmt.Errorf("volume %v has GET error for volume attachment %v: %v", volumeHandle, attachID, err)
}
successful, err := verifyAttachmentStatus(attach, volumeHandle)
err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyAttachmentStatus)
if err != nil {
return "", err
}
if successful {
return attachID, nil
}
watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(meta.SingleObject(meta.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
if err != nil {
return "", fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
}
ch := watcher.ResultChan()
defer watcher.Stop()
for {
select {
case event, ok := <-ch:
if !ok {
klog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID)
return "", errors.New("volume attachment watch channel had been closed")
}
switch event.Type {
case watch.Added, watch.Modified:
attach, _ := event.Object.(*storage.VolumeAttachment)
successful, err := verifyAttachmentStatus(attach, volumeHandle)
if err != nil {
return "", err
}
if successful {
return attachID, nil
}
case watch.Deleted:
// if deleted, fail fast
klog.Error(log("VolumeAttachment [%s] has been deleted, will not continue to wait for attachment", attachID))
return "", errors.New("volume attachment has been deleted")
case watch.Error:
klog.Warningf("waitForVolumeAttachmentInternal received watch error: %v", event)
}
case <-timer.C:
klog.Error(log("attacher.WaitForAttach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
return "", fmt.Errorf("attachment timeout for volume %v", volumeHandle)
}
}
}
func verifyAttachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
// if being deleted, fail fast
if attachment.GetDeletionTimestamp() != nil {
klog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachment.Name))
return false, errors.New("volume attachment is being deleted")
}
// attachment OK
if attachment.Status.Attached {
return true, nil
}
// driver reports attach error
attachErr := attachment.Status.AttachError
if attachErr != nil {
klog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message))
return false, errors.New(attachErr.Message)
}
return false, nil
return attach.Name, nil
}
func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
@ -318,8 +259,8 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
}
if err = saveVolumeData(dataDir, volDataFileName, data); err != nil {
klog.Error(log("failed to save volume info data: %v", err))
if cleanerr := os.RemoveAll(dataDir); cleanerr != nil {
klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, cleanerr))
if cleanErr := os.RemoveAll(dataDir); cleanErr != nil {
klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, cleanErr))
}
return err
}
@ -445,7 +386,8 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
}
klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
return c.waitForVolumeDetachment(volID, attachID)
err := c.waitForVolumeDetachment(volID, attachID)
return err
}
func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string) error {
@ -458,7 +400,8 @@ func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string) err
return c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout)
}
func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) error {
func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID string, timer *time.Timer,
timeout time.Duration) error {
klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
@ -469,16 +412,26 @@ func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID str
}
return errors.New(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
}
// driver reports attach error
detachErr := attach.Status.DetachError
if detachErr != nil {
klog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message))
return errors.New(detachErr.Message)
err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyDetachmentStatus)
if err != nil {
return err
}
return err
}
func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string,
timer *time.Timer, timeout time.Duration, verifyStatus verifyAttachDetachStatus) error {
successful, err := verifyStatus(attach, volumeHandle)
if err != nil {
return err
}
if successful {
return nil
}
watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(meta.SingleObject(meta.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
if err != nil {
return errors.New(log("watch error:%v for volume %v", err, volumeHandle))
return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
}
ch := watcher.ResultChan()
@ -494,24 +447,30 @@ func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID str
switch event.Type {
case watch.Added, watch.Modified:
attach, _ := event.Object.(*storage.VolumeAttachment)
// driver reports attach error
detachErr := attach.Status.DetachError
if detachErr != nil {
klog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message))
return errors.New(detachErr.Message)
successful, err := verifyStatus(attach, volumeHandle)
if err != nil {
return err
}
if successful {
return nil
}
case watch.Deleted:
//object deleted
// set attach nil to get different results
// for detachment, a deleted event means successful detachment, should return success
// for attachment, should return fail
if successful, err := verifyStatus(nil, volumeHandle); !successful {
return err
}
klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] has been deleted", attachID, volumeHandle))
return nil
case watch.Error:
klog.Warningf("waitForVolumeDetachmentInternal received watch error: %v", event)
klog.Warningf("waitForVolumeAttachDetachInternal received watch error: %v", event)
}
case <-timer.C:
klog.Error(log("detacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
return fmt.Errorf("detachment timeout for volume %v", volumeHandle)
klog.Error(log("attachdetacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
return fmt.Errorf("attachdetachment timeout for volume %v", volumeHandle)
}
}
}
@ -638,3 +597,42 @@ func getDriverAndVolNameFromDeviceMountPath(k8s kubernetes.Interface, deviceMoun
return csiSource.Driver, csiSource.VolumeHandle, nil
}
func verifyAttachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
// when we received a deleted event during attachment, fail fast
if attachment == nil {
klog.Error(log("VolumeAttachment [%s] has been deleted, will not continue to wait for attachment", volumeHandle))
return false, errors.New("volume attachment has been deleted")
}
// if being deleted, fail fast
if attachment.GetDeletionTimestamp() != nil {
klog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachment.Name))
return false, errors.New("volume attachment is being deleted")
}
// attachment OK
if attachment.Status.Attached {
return true, nil
}
// driver reports attach error
attachErr := attachment.Status.AttachError
if attachErr != nil {
klog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message))
return false, errors.New(attachErr.Message)
}
return false, nil
}
func verifyDetachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
// when we received a deleted event during detachment
// it means we have successfully detached it.
if attachment == nil {
return true, nil
}
// driver reports detach error
detachErr := attachment.Status.DetachError
if detachErr != nil {
klog.Error(log("detachment for VolumeAttachment for volume [%s] failed: %v", volumeHandle, detachErr.Message))
return false, errors.New(detachErr.Message)
}
return false, nil
}