Merge pull request #57194 from linyouchong/linyouchong-20171214

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

fix TODO:change to a api-server watch

What this PR does / why we need it:
fix TODO (vladimirvivien) instead of polling api-server, change to a api-server watch

Which issue(s) this PR fixes (optional, in fixes #<issue number>(, fixes #<issue_number>, ...) format, will close the issue(s) when PR gets merged):
Fixes #58031

Special notes for your reviewer:

Release note:
NONE

@vladimirvivien,@jsafrane,@saad-ali
I saw some TODO there and I am very interested in fixing it.
Please let me know if it is not inappropriate to do this.
This commit is contained in:
Kubernetes Submit Queue 2018-02-06 23:35:13 -08:00 committed by GitHub
commit 11104d75f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 264 additions and 89 deletions

View File

@ -24,6 +24,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)
@ -51,7 +52,9 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library",
],
)

View File

@ -30,6 +30,7 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/volume"
)
@ -117,21 +118,18 @@ func (c *csiAttacher) WaitForAttach(spec *volume.Spec, attachID string, pod *v1.
func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, timeout time.Duration) (string, error) {
glog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
ticker := time.NewTicker(c.waitSleepTime)
defer ticker.Stop()
timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
defer timer.Stop()
//TODO (vladimirvivien) instead of polling api-server, change to a api-server watch
for {
select {
case <-ticker.C:
return c.waitForVolumeAttachmentInternal(volumeHandle, attachID, timer, timeout)
}
func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) {
glog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
glog.Error(log("attacher.WaitForAttach failed (will continue to try): %v", err))
continue
glog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
return "", err
}
// if being deleted, fail fast
if attach.GetDeletionTimestamp() != nil {
@ -148,6 +146,51 @@ func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, tim
glog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message))
return "", errors.New(attachErr.Message)
}
watcher, err := c.k8s.StorageV1beta1().VolumeAttachments().Watch(meta.SingleObject(meta.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
if err != nil {
return "", fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
}
ch := watcher.ResultChan()
defer watcher.Stop()
for {
select {
case event, ok := <-ch:
if !ok {
glog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID)
return "", errors.New("volume attachment watch channel had been closed")
}
switch event.Type {
case watch.Added, watch.Modified:
attach, _ := event.Object.(*storage.VolumeAttachment)
// if being deleted, fail fast
if attach.GetDeletionTimestamp() != nil {
glog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachID))
return "", errors.New("volume attachment is being deleted")
}
// attachment OK
if attach.Status.Attached {
return attachID, nil
}
// driver reports attach error
attachErr := attach.Status.AttachError
if attachErr != nil {
glog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message))
return "", errors.New(attachErr.Message)
}
case watch.Deleted:
// if deleted, fail fast
glog.Error(log("VolumeAttachment [%s] has been deleted, will not continue to wait for attachment", attachID))
return "", errors.New("volume attachment has been deleted")
case watch.Error:
// start another cycle
c.waitForVolumeAttachmentInternal(volumeHandle, attachID, timer, timeout)
}
case <-timer.C:
glog.Error(log("attacher.WaitForAttach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
return "", fmt.Errorf("attachment timeout for volume %v", volumeHandle)
@ -224,17 +267,14 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string) error {
glog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
ticker := time.NewTicker(c.waitSleepTime)
defer ticker.Stop()
timeout := c.waitSleepTime * 10
timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
defer timer.Stop()
//TODO (vladimirvivien) instead of polling api-server, change to a api-server watch
for {
select {
case <-ticker.C:
return c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout)
}
func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) error {
glog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1beta1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
@ -244,18 +284,52 @@ func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string) err
return nil
}
glog.Error(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
continue
return err
}
// driver reports attach error
detachErr := attach.Status.DetachError
if detachErr != nil {
glog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message))
return errors.New(detachErr.Message)
}
watcher, err := c.k8s.StorageV1beta1().VolumeAttachments().Watch(meta.SingleObject(meta.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
if err != nil {
return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
}
ch := watcher.ResultChan()
defer watcher.Stop()
for {
select {
case event, ok := <-ch:
if !ok {
glog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID)
return errors.New("volume attachment watch channel had been closed")
}
switch event.Type {
case watch.Added, watch.Modified:
attach, _ := event.Object.(*storage.VolumeAttachment)
// driver reports attach error
detachErr := attach.Status.DetachError
if detachErr != nil {
glog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message))
return errors.New(detachErr.Message)
}
case watch.Deleted:
//object deleted
glog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] has been deleted", attachID, volumeHandle))
return nil
case watch.Error:
// start another cycle
c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout)
}
case <-timer.C:
glog.Error(log("detacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
return fmt.Errorf("detachment timed out for volume %v", volumeHandle)
return fmt.Errorf("detachment timeout for volume %v", volumeHandle)
}
}
}

View File

@ -26,7 +26,12 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
fakeclient "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
func makeTestAttachment(attachID, nodeName, pvName string) *storage.VolumeAttachment {
@ -50,15 +55,6 @@ func makeTestAttachment(attachID, nodeName, pvName string) *storage.VolumeAttach
}
func TestAttacherAttach(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
testCases := []struct {
name string
@ -111,6 +107,17 @@ func TestAttacherAttach(t *testing.T) {
// attacher loop
for i, tc := range testCases {
t.Logf("test case: %s", tc.name)
plug, fakeWatcher, tmpDir := newTestWatchPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
spec := volume.NewSpecFromPersistentVolume(makeTestPV(fmt.Sprintf("test-pv%d", i), 10, tc.driverName, tc.volumeName), false)
go func(id, nodename string, fail bool) {
@ -143,18 +150,21 @@ func TestAttacherAttach(t *testing.T) {
}
if attach == nil {
t.Error("attachment not found")
}
t.Logf("attachment not found for id:%v", tc.attachID)
} else {
attach.Status.Attached = true
_, err = csiAttacher.k8s.StorageV1beta1().VolumeAttachments().Update(attach)
if err != nil {
t.Error(err)
}
fakeWatcher.Modify(attach)
}
}
}
func TestAttacherWaitForVolumeAttachment(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
plug, fakeWatcher, tmpDir := newTestWatchPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
@ -166,41 +176,91 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) {
testCases := []struct {
name string
attached bool
attachErr *storage.VolumeError
initAttached bool
finalAttached bool
trigerWatchEventTime time.Duration
initAttachErr *storage.VolumeError
finalAttachErr *storage.VolumeError
sleepTime time.Duration
timeout time.Duration
shouldFail bool
}{
{name: "attach ok", attached: true, sleepTime: 10 * time.Millisecond, timeout: 50 * time.Millisecond},
{name: "attachment error", attachErr: &storage.VolumeError{Message: "missing volume"}, sleepTime: 10 * time.Millisecond, timeout: 30 * time.Millisecond},
{name: "time ran out", attached: false, sleepTime: 5 * time.Millisecond},
{
name: "attach success at get",
initAttached: true,
sleepTime: 10 * time.Millisecond,
timeout: 50 * time.Millisecond,
shouldFail: false,
},
{
name: "attachment error ant get",
initAttachErr: &storage.VolumeError{Message: "missing volume"},
sleepTime: 10 * time.Millisecond,
timeout: 30 * time.Millisecond,
shouldFail: true,
},
{
name: "attach success at watch",
initAttached: false,
finalAttached: true,
trigerWatchEventTime: 5 * time.Millisecond,
timeout: 50 * time.Millisecond,
sleepTime: 5 * time.Millisecond,
shouldFail: false,
},
{
name: "attachment error ant watch",
initAttached: false,
finalAttached: false,
finalAttachErr: &storage.VolumeError{Message: "missing volume"},
trigerWatchEventTime: 5 * time.Millisecond,
sleepTime: 10 * time.Millisecond,
timeout: 30 * time.Millisecond,
shouldFail: true,
},
{
name: "time ran out",
initAttached: false,
finalAttached: true,
trigerWatchEventTime: 100 * time.Millisecond,
timeout: 50 * time.Millisecond,
sleepTime: 5 * time.Millisecond,
shouldFail: true,
},
}
for i, tc := range testCases {
t.Logf("running test: %s", tc.name)
fakeWatcher.Reset()
t.Logf("running test: %v", tc.name)
pvName := fmt.Sprintf("test-pv-%d", i)
volID := fmt.Sprintf("test-vol-%d", i)
attachID := getAttachmentName(volID, testDriver, nodeName)
attachment := makeTestAttachment(attachID, nodeName, pvName)
attachment.Status.Attached = tc.attached
attachment.Status.AttachError = tc.attachErr
attachment.Status.Attached = tc.initAttached
attachment.Status.AttachError = tc.initAttachErr
csiAttacher.waitSleepTime = tc.sleepTime
go func() {
_, err := csiAttacher.k8s.StorageV1beta1().VolumeAttachments().Create(attachment)
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
// after timeout, fakeWatcher will be closed by csiAttacher.waitForVolumeAttachment
if tc.trigerWatchEventTime > 0 && tc.trigerWatchEventTime < tc.timeout {
go func() {
time.Sleep(tc.trigerWatchEventTime)
attachment.Status.Attached = tc.finalAttached
attachment.Status.AttachError = tc.finalAttachErr
fakeWatcher.Modify(attachment)
}()
}
retID, err := csiAttacher.waitForVolumeAttachment(volID, attachID, tc.timeout)
if tc.shouldFail && err == nil {
t.Error("expecting failure, but err is nil")
}
if tc.attachErr != nil {
if tc.attachErr.Message != err.Error() {
t.Errorf("expecting error [%v], got [%v]", tc.attachErr.Message, err.Error())
if tc.initAttachErr != nil {
if tc.initAttachErr.Message != err.Error() {
t.Errorf("expecting error [%v], got [%v]", tc.initAttachErr.Message, err.Error())
}
}
if err == nil && retID != attachID {
@ -268,14 +328,7 @@ func TestAttacherVolumesAreAttached(t *testing.T) {
}
func TestAttacherDetach(t *testing.T) {
plug, tmpDir := newTestPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := attacher.(*csiAttacher)
nodeName := "test-node"
testCases := []struct {
name string
@ -289,6 +342,16 @@ func TestAttacherDetach(t *testing.T) {
}
for _, tc := range testCases {
t.Logf("running test: %v", tc.name)
plug, fakeWatcher, tmpDir := newTestWatchPlugin(t)
defer os.RemoveAll(tmpDir)
attacher, err0 := plug.NewAttacher()
if err0 != nil {
t.Fatalf("failed to create new attacher: %v", err0)
}
csiAttacher := attacher.(*csiAttacher)
pv := makeTestPV("test-pv", 10, testDriver, tc.volID)
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
attachment := makeTestAttachment(tc.attachID, nodeName, "test-pv")
@ -300,6 +363,9 @@ func TestAttacherDetach(t *testing.T) {
if err != nil {
t.Errorf("test case %s failed: %v", tc.name, err)
}
go func() {
fakeWatcher.Delete(attachment)
}()
err = csiAttacher.Detach(volumeName, types.NodeName(nodeName))
if tc.shouldFail && err == nil {
t.Fatal("expecting failure, but err = nil")
@ -319,3 +385,35 @@ func TestAttacherDetach(t *testing.T) {
}
}
}
// create a plugin mgr to load plugins and setup a fake client
func newTestWatchPlugin(t *testing.T) (*csiPlugin, *watch.FakeWatcher, string) {
tmpDir, err := utiltesting.MkTmpdir("csi-test")
if err != nil {
t.Fatalf("can't create temp dir: %v", err)
}
fakeClient := fakeclient.NewSimpleClientset()
fakeWatcher := watch.NewFake()
fakeClient.Fake.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatcher, nil))
fakeClient.Fake.WatchReactionChain = fakeClient.Fake.WatchReactionChain[:1]
host := volumetest.NewFakeVolumeHost(
tmpDir,
fakeClient,
nil,
)
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
plug, err := plugMgr.FindPluginByName(csiPluginName)
if err != nil {
t.Fatalf("can't find plugin %v", csiPluginName)
}
csiPlug, ok := plug.(*csiPlugin)
if !ok {
t.Fatalf("cannot assert plugin to be type csiPlugin")
}
return csiPlug, fakeWatcher, tmpDir
}