From ddf4c2d62ca0b005bc115a405ae3fb7c00bc36a9 Mon Sep 17 00:00:00 2001 From: linyouchong Date: Fri, 2 Feb 2018 17:36:03 +0800 Subject: [PATCH] fix TODO:change to a api-server watch --- pkg/volume/csi/BUILD | 3 + pkg/volume/csi/csi_attacher.go | 164 +++++++++++++++++------- pkg/volume/csi/csi_attacher_test.go | 186 +++++++++++++++++++++------- 3 files changed, 264 insertions(+), 89 deletions(-) diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 3d2b81746a0..33867ffbe44 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -24,6 +24,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", ], ) @@ -51,7 +52,9 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", "//vendor/k8s.io/client-go/util/testing:go_default_library", ], ) diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index c20564b79a2..3a79846a55e 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -30,6 +30,7 @@ import ( apierrs "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/volume" ) @@ -117,37 +118,79 @@ func (c *csiAttacher) WaitForAttach(spec *volume.Spec, attachID string, pod *v1. func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, timeout time.Duration) (string, error) { glog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID)) - ticker := time.NewTicker(c.waitSleepTime) - defer ticker.Stop() - timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable defer timer.Stop() - //TODO (vladimirvivien) instead of polling api-server, change to a api-server watch + return c.waitForVolumeAttachmentInternal(volumeHandle, attachID, timer, timeout) +} + +func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) { + glog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID)) + attach, err := c.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{}) + if err != nil { + glog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err)) + return "", err + } + // if being deleted, fail fast + if attach.GetDeletionTimestamp() != nil { + glog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachID)) + return "", errors.New("volume attachment is being deleted") + } + // attachment OK + if attach.Status.Attached { + return attachID, nil + } + // driver reports attach error + attachErr := attach.Status.AttachError + if attachErr != nil { + glog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message)) + return "", errors.New(attachErr.Message) + } + + watcher, err := c.k8s.StorageV1beta1().VolumeAttachments().Watch(meta.SingleObject(meta.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion})) + if err != nil { + return "", fmt.Errorf("watch error:%v for volume %v", err, volumeHandle) + } + + ch := watcher.ResultChan() + defer watcher.Stop() + for { select { - case <-ticker.C: - glog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID)) - attach, err := c.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{}) - if err != nil { - glog.Error(log("attacher.WaitForAttach failed (will continue to try): %v", err)) - continue + case event, ok := <-ch: + if !ok { + glog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID) + return "", errors.New("volume attachment watch channel had been closed") } - // if being deleted, fail fast - if attach.GetDeletionTimestamp() != nil { - glog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachID)) - return "", errors.New("volume attachment is being deleted") - } - // attachment OK - if attach.Status.Attached { - return attachID, nil - } - // driver reports attach error - attachErr := attach.Status.AttachError - if attachErr != nil { - glog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message)) - return "", errors.New(attachErr.Message) + + switch event.Type { + case watch.Added, watch.Modified: + attach, _ := event.Object.(*storage.VolumeAttachment) + // if being deleted, fail fast + if attach.GetDeletionTimestamp() != nil { + glog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachID)) + return "", errors.New("volume attachment is being deleted") + } + // attachment OK + if attach.Status.Attached { + return attachID, nil + } + // driver reports attach error + attachErr := attach.Status.AttachError + if attachErr != nil { + glog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message)) + return "", errors.New(attachErr.Message) + } + case watch.Deleted: + // if deleted, fail fast + glog.Error(log("VolumeAttachment [%s] has been deleted, will not continue to wait for attachment", attachID)) + return "", errors.New("volume attachment has been deleted") + + case watch.Error: + // start another cycle + c.waitForVolumeAttachmentInternal(volumeHandle, attachID, timer, timeout) } + case <-timer.C: glog.Error(log("attacher.WaitForAttach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID)) return "", fmt.Errorf("attachment timeout for volume %v", volumeHandle) @@ -224,38 +267,69 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error { func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string) error { glog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID)) - ticker := time.NewTicker(c.waitSleepTime) - defer ticker.Stop() - timeout := c.waitSleepTime * 10 timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable defer timer.Stop() - //TODO (vladimirvivien) instead of polling api-server, change to a api-server watch + return c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout) +} + +func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) error { + glog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID)) + attach, err := c.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{}) + if err != nil { + if apierrs.IsNotFound(err) { + //object deleted or never existed, done + glog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle)) + return nil + } + glog.Error(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err)) + return err + } + // driver reports attach error + detachErr := attach.Status.DetachError + if detachErr != nil { + glog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message)) + return errors.New(detachErr.Message) + } + + watcher, err := c.k8s.StorageV1beta1().VolumeAttachments().Watch(meta.SingleObject(meta.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion})) + if err != nil { + return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle) + } + ch := watcher.ResultChan() + defer watcher.Stop() + for { select { - case <-ticker.C: - glog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID)) - attach, err := c.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{}) - if err != nil { - if apierrs.IsNotFound(err) { - //object deleted or never existed, done - glog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle)) - return nil - } - glog.Error(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err)) - continue + case event, ok := <-ch: + if !ok { + glog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID) + return errors.New("volume attachment watch channel had been closed") } - // driver reports attach error - detachErr := attach.Status.DetachError - if detachErr != nil { - glog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message)) - return errors.New(detachErr.Message) + switch event.Type { + case watch.Added, watch.Modified: + attach, _ := event.Object.(*storage.VolumeAttachment) + // driver reports attach error + detachErr := attach.Status.DetachError + if detachErr != nil { + glog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message)) + return errors.New(detachErr.Message) + } + case watch.Deleted: + //object deleted + glog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] has been deleted", attachID, volumeHandle)) + return nil + + case watch.Error: + // start another cycle + c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout) } + case <-timer.C: glog.Error(log("detacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID)) - return fmt.Errorf("detachment timed out for volume %v", volumeHandle) + return fmt.Errorf("detachment timeout for volume %v", volumeHandle) } } } diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index b2ed1c35527..4b0d67baa2b 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -26,7 +26,12 @@ import ( apierrs "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + fakeclient "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" + utiltesting "k8s.io/client-go/util/testing" "k8s.io/kubernetes/pkg/volume" + volumetest "k8s.io/kubernetes/pkg/volume/testing" ) func makeTestAttachment(attachID, nodeName, pvName string) *storage.VolumeAttachment { @@ -50,15 +55,6 @@ func makeTestAttachment(attachID, nodeName, pvName string) *storage.VolumeAttach } func TestAttacherAttach(t *testing.T) { - plug, tmpDir := newTestPlugin(t) - defer os.RemoveAll(tmpDir) - - attacher, err := plug.NewAttacher() - if err != nil { - t.Fatalf("failed to create new attacher: %v", err) - } - - csiAttacher := attacher.(*csiAttacher) testCases := []struct { name string @@ -111,6 +107,17 @@ func TestAttacherAttach(t *testing.T) { // attacher loop for i, tc := range testCases { t.Logf("test case: %s", tc.name) + + plug, fakeWatcher, tmpDir := newTestWatchPlugin(t) + defer os.RemoveAll(tmpDir) + + attacher, err := plug.NewAttacher() + if err != nil { + t.Fatalf("failed to create new attacher: %v", err) + } + + csiAttacher := attacher.(*csiAttacher) + spec := volume.NewSpecFromPersistentVolume(makeTestPV(fmt.Sprintf("test-pv%d", i), 10, tc.driverName, tc.volumeName), false) go func(id, nodename string, fail bool) { @@ -143,18 +150,21 @@ func TestAttacherAttach(t *testing.T) { } if attach == nil { - t.Error("attachment not found") - } - attach.Status.Attached = true - _, err = csiAttacher.k8s.StorageV1beta1().VolumeAttachments().Update(attach) - if err != nil { - t.Error(err) + t.Logf("attachment not found for id:%v", tc.attachID) + } else { + attach.Status.Attached = true + _, err = csiAttacher.k8s.StorageV1beta1().VolumeAttachments().Update(attach) + if err != nil { + t.Error(err) + } + fakeWatcher.Modify(attach) } } } func TestAttacherWaitForVolumeAttachment(t *testing.T) { - plug, tmpDir := newTestPlugin(t) + + plug, fakeWatcher, tmpDir := newTestWatchPlugin(t) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -165,42 +175,92 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { nodeName := "test-node" testCases := []struct { - name string - attached bool - attachErr *storage.VolumeError - sleepTime time.Duration - timeout time.Duration - shouldFail bool + name string + initAttached bool + finalAttached bool + trigerWatchEventTime time.Duration + initAttachErr *storage.VolumeError + finalAttachErr *storage.VolumeError + sleepTime time.Duration + timeout time.Duration + shouldFail bool }{ - {name: "attach ok", attached: true, sleepTime: 10 * time.Millisecond, timeout: 50 * time.Millisecond}, - {name: "attachment error", attachErr: &storage.VolumeError{Message: "missing volume"}, sleepTime: 10 * time.Millisecond, timeout: 30 * time.Millisecond}, - {name: "time ran out", attached: false, sleepTime: 5 * time.Millisecond}, + { + name: "attach success at get", + initAttached: true, + sleepTime: 10 * time.Millisecond, + timeout: 50 * time.Millisecond, + shouldFail: false, + }, + { + name: "attachment error ant get", + initAttachErr: &storage.VolumeError{Message: "missing volume"}, + sleepTime: 10 * time.Millisecond, + timeout: 30 * time.Millisecond, + shouldFail: true, + }, + { + name: "attach success at watch", + initAttached: false, + finalAttached: true, + trigerWatchEventTime: 5 * time.Millisecond, + timeout: 50 * time.Millisecond, + sleepTime: 5 * time.Millisecond, + shouldFail: false, + }, + { + name: "attachment error ant watch", + initAttached: false, + finalAttached: false, + finalAttachErr: &storage.VolumeError{Message: "missing volume"}, + trigerWatchEventTime: 5 * time.Millisecond, + sleepTime: 10 * time.Millisecond, + timeout: 30 * time.Millisecond, + shouldFail: true, + }, + { + name: "time ran out", + initAttached: false, + finalAttached: true, + trigerWatchEventTime: 100 * time.Millisecond, + timeout: 50 * time.Millisecond, + sleepTime: 5 * time.Millisecond, + shouldFail: true, + }, } for i, tc := range testCases { - t.Logf("running test: %s", tc.name) + fakeWatcher.Reset() + t.Logf("running test: %v", tc.name) pvName := fmt.Sprintf("test-pv-%d", i) volID := fmt.Sprintf("test-vol-%d", i) attachID := getAttachmentName(volID, testDriver, nodeName) attachment := makeTestAttachment(attachID, nodeName, pvName) - attachment.Status.Attached = tc.attached - attachment.Status.AttachError = tc.attachErr + attachment.Status.Attached = tc.initAttached + attachment.Status.AttachError = tc.initAttachErr csiAttacher.waitSleepTime = tc.sleepTime + _, err := csiAttacher.k8s.StorageV1beta1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to attach: %v", err) + } - go func() { - _, err := csiAttacher.k8s.StorageV1beta1().VolumeAttachments().Create(attachment) - if err != nil { - t.Fatalf("failed to attach: %v", err) - } - }() + // after timeout, fakeWatcher will be closed by csiAttacher.waitForVolumeAttachment + if tc.trigerWatchEventTime > 0 && tc.trigerWatchEventTime < tc.timeout { + go func() { + time.Sleep(tc.trigerWatchEventTime) + attachment.Status.Attached = tc.finalAttached + attachment.Status.AttachError = tc.finalAttachErr + fakeWatcher.Modify(attachment) + }() + } retID, err := csiAttacher.waitForVolumeAttachment(volID, attachID, tc.timeout) if tc.shouldFail && err == nil { t.Error("expecting failure, but err is nil") } - if tc.attachErr != nil { - if tc.attachErr.Message != err.Error() { - t.Errorf("expecting error [%v], got [%v]", tc.attachErr.Message, err.Error()) + if tc.initAttachErr != nil { + if tc.initAttachErr.Message != err.Error() { + t.Errorf("expecting error [%v], got [%v]", tc.initAttachErr.Message, err.Error()) } } if err == nil && retID != attachID { @@ -268,14 +328,7 @@ func TestAttacherVolumesAreAttached(t *testing.T) { } func TestAttacherDetach(t *testing.T) { - plug, tmpDir := newTestPlugin(t) - defer os.RemoveAll(tmpDir) - attacher, err := plug.NewAttacher() - if err != nil { - t.Fatalf("failed to create new attacher: %v", err) - } - csiAttacher := attacher.(*csiAttacher) nodeName := "test-node" testCases := []struct { name string @@ -289,6 +342,16 @@ func TestAttacherDetach(t *testing.T) { } for _, tc := range testCases { + t.Logf("running test: %v", tc.name) + plug, fakeWatcher, tmpDir := newTestWatchPlugin(t) + defer os.RemoveAll(tmpDir) + + attacher, err0 := plug.NewAttacher() + if err0 != nil { + t.Fatalf("failed to create new attacher: %v", err0) + } + csiAttacher := attacher.(*csiAttacher) + pv := makeTestPV("test-pv", 10, testDriver, tc.volID) spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) attachment := makeTestAttachment(tc.attachID, nodeName, "test-pv") @@ -300,6 +363,9 @@ func TestAttacherDetach(t *testing.T) { if err != nil { t.Errorf("test case %s failed: %v", tc.name, err) } + go func() { + fakeWatcher.Delete(attachment) + }() err = csiAttacher.Detach(volumeName, types.NodeName(nodeName)) if tc.shouldFail && err == nil { t.Fatal("expecting failure, but err = nil") @@ -319,3 +385,35 @@ func TestAttacherDetach(t *testing.T) { } } } + +// create a plugin mgr to load plugins and setup a fake client +func newTestWatchPlugin(t *testing.T) (*csiPlugin, *watch.FakeWatcher, string) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + + fakeClient := fakeclient.NewSimpleClientset() + fakeWatcher := watch.NewFake() + fakeClient.Fake.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatcher, nil)) + fakeClient.Fake.WatchReactionChain = fakeClient.Fake.WatchReactionChain[:1] + host := volumetest.NewFakeVolumeHost( + tmpDir, + fakeClient, + nil, + ) + plugMgr := &volume.VolumePluginMgr{} + plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) + + plug, err := plugMgr.FindPluginByName(csiPluginName) + if err != nil { + t.Fatalf("can't find plugin %v", csiPluginName) + } + + csiPlug, ok := plug.(*csiPlugin) + if !ok { + t.Fatalf("cannot assert plugin to be type csiPlugin") + } + + return csiPlug, fakeWatcher, tmpDir +}