Poll for VolumeAttachments in CSI attacher

The CSI attacher that runs inside of the AttachDetachController has
access to a VolumeAttachment lister. By polling this lister for the
status of VolumeAttachments, we can save threads on the API server by
not using watches.
This commit is contained in:
Chris Henzie 2021-02-02 10:30:34 -08:00
parent 12e85e0e1c
commit f36fec997a
4 changed files with 171 additions and 162 deletions

View File

@ -29,6 +29,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -26,6 +26,7 @@ import (
"strings"
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
@ -34,6 +35,7 @@ import (
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/volume"
@ -76,6 +78,12 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
node := string(nodeName)
attachID := getAttachmentName(pvSrc.VolumeHandle, pvSrc.Driver, node)
attachment, err := c.plugin.volumeAttachmentLister.Get(attachID)
if err != nil && !apierrors.IsNotFound(err) {
return "", errors.New(log("failed to get volume attachment from lister: %v", err))
}
if attachment == nil {
var vaSrc storage.VolumeAttachmentSource
if spec.InlineVolumeSpecForCSIMigration {
// inline PV scenario - use PV spec to populate VA source.
@ -107,21 +115,19 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
}
_, err = c.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
alreadyExist := false
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return "", errors.New(log("attacher.Attach failed: %v", err))
}
alreadyExist = true
}
if alreadyExist {
klog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, pvSrc.VolumeHandle))
} else {
klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle))
}
}
if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, c.watchTimeout); err != nil {
// Attach and detach functionality is exclusive to the CSI plugin that runs in the AttachDetachController,
// and has access to a VolumeAttachment lister that can be polled for the current status.
if err := c.waitForVolumeAttachmentWithLister(pvSrc.VolumeHandle, attachID, c.watchTimeout); err != nil {
return "", err
}
@ -166,6 +172,32 @@ func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID str
return attach.Name, nil
}
func (c *csiAttacher) waitForVolumeAttachmentWithLister(volumeHandle, attachID string, timeout time.Duration) error {
klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
verifyStatus := func() (bool, error) {
volumeAttachment, err := c.plugin.volumeAttachmentLister.Get(attachID)
if err != nil {
// Ignore "not found" errors in case the VolumeAttachment was just created and hasn't yet made it into the lister.
if !apierrors.IsNotFound(err) {
klog.Error(log("unexpected error waiting for volume attachment, %v", err))
return false, err
}
// The VolumeAttachment is not available yet and we will have to try again.
return false, nil
}
successful, err := verifyAttachmentStatus(volumeAttachment, volumeHandle)
if err != nil {
return false, err
}
return successful, nil
}
return c.waitForVolumeAttachDetachStatusWithLister(volumeHandle, attachID, timeout, verifyStatus, "Attach")
}
func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
klog.V(4).Info(log("probing attachment status for %d volume(s) ", len(specs)))
@ -399,36 +431,70 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
}
klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
err := c.waitForVolumeDetachment(volID, attachID, c.watchTimeout)
// Attach and detach functionality is exclusive to the CSI plugin that runs in the AttachDetachController,
// and has access to a VolumeAttachment lister that can be polled for the current status.
return c.waitForVolumeDetachmentWithLister(volID, attachID, c.watchTimeout)
}
func (c *csiAttacher) waitForVolumeDetachmentWithLister(volumeHandle, attachID string, timeout time.Duration) error {
klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
verifyStatus := func() (bool, error) {
volumeAttachment, err := c.plugin.volumeAttachmentLister.Get(attachID)
if err != nil {
if !apierrors.IsNotFound(err) {
return false, errors.New(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
}
// Detachment successful.
klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle))
return true, nil
}
// Detachment is only "successful" once the VolumeAttachment is deleted, however we perform
// this check to make sure the object does not contain any detach errors.
successful, err := verifyDetachmentStatus(volumeAttachment, volumeHandle)
if err != nil {
return false, err
}
return successful, nil
}
return c.waitForVolumeAttachDetachStatusWithLister(volumeHandle, attachID, timeout, verifyStatus, "Detach")
}
func (c *csiAttacher) waitForVolumeAttachDetachStatusWithLister(volumeHandle, attachID string, timeout time.Duration, verifyStatus func() (bool, error), operation string) error {
var (
initBackoff = 500 * time.Millisecond
// This is approximately the duration between consecutive ticks after two minutes (CSI timeout).
maxBackoff = 7 * time.Second
resetDuration = time.Minute
backoffFactor = 1.05
jitter = 0.1
clock = &clock.RealClock{}
)
backoffMgr := wait.NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration, backoffFactor, jitter, clock)
defer backoffMgr.Backoff().Stop()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
select {
case <-backoffMgr.Backoff().C():
successful, err := verifyStatus()
if err != nil {
return err
}
func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string, timeout time.Duration) error {
klog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
defer timer.Stop()
return c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout)
}
func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID string, timer *time.Timer,
timeout time.Duration) error {
klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
//object deleted or never existed, done
klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle))
if successful {
return nil
}
return errors.New(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
case <-ctx.Done():
klog.Error(log("%s timeout after %v [volume=%v; attachment.ID=%v]", operation, timeout, volumeHandle, attachID))
return fmt.Errorf("%s timeout for volume %v", operation, volumeHandle)
}
err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyDetachmentStatus)
if err != nil {
return err
}
return err
}
func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string,

View File

@ -199,12 +199,9 @@ func TestAttacherAttach(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Logf("test case: %s", tc.name)
fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient)
plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, fakeClient)
defer os.RemoveAll(tmpDir)
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
@ -234,12 +231,10 @@ func TestAttacherAttach(t *testing.T) {
status.AttachError = &storage.VolumeError{
Message: "attacher error",
}
errStatus := apierrors.NewInternalError(fmt.Errorf("we got an error")).Status()
fakeWatcher.Error(&errStatus)
} else {
status.Attached = true
}
markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, tc.attachID, status)
markVolumeAttached(t, csiAttacher.k8s, nil, tc.attachID, status)
wg.Wait()
})
}
@ -291,12 +286,9 @@ func TestAttacherAttachWithInline(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Logf("test case: %s", tc.name)
fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient)
plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, fakeClient)
defer os.RemoveAll(tmpDir)
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
@ -325,7 +317,7 @@ func TestAttacherAttachWithInline(t *testing.T) {
} else {
status.Attached = true
}
markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, tc.attachID, status)
markVolumeAttached(t, csiAttacher.k8s, nil, tc.attachID, status)
wg.Wait()
})
}
@ -366,25 +358,9 @@ func TestAttacherWithCSIDriver(t *testing.T) {
getTestCSIDriver("attachable", nil, &bTrue, nil),
getTestCSIDriver("nil", nil, nil, nil),
)
plug, tmpDir := newTestPlugin(t, fakeClient)
plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, fakeClient)
defer os.RemoveAll(tmpDir)
attachmentWatchCreated := make(chan core.Action)
// Make sure this is the first reactor
fakeClient.Fake.PrependWatchReactor("volumeattachments", func(action core.Action) (bool, watch.Interface, error) {
select {
case <-attachmentWatchCreated:
// already closed
default:
// The attacher is already watching the attachment, notify the test goroutine to
// update the status of attachment.
// TODO: In theory this still has a race condition, because the actual watch is created by
// the next reactor in the chain and we unblock the test goroutine before returning here.
close(attachmentWatchCreated)
}
return false, nil, nil
})
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
@ -423,9 +399,6 @@ func TestAttacherWithCSIDriver(t *testing.T) {
status := storage.VolumeAttachmentStatus{
Attached: true,
}
// We want to ensure the watcher, which is created in csiAttacher,
// has been started before updating the status of attachment.
<-attachmentWatchCreated
markVolumeAttached(t, csiAttacher.k8s, nil, expectedAttachID, status)
}
wg.Wait()
@ -827,7 +800,7 @@ func TestAttacherVolumesAreAttached(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil)
plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, nil)
defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher()
@ -951,7 +924,6 @@ func TestAttacherDetach(t *testing.T) {
volID string
attachID string
shouldFail bool
watcherError bool
reactor func(action core.Action) (handled bool, ret runtime.Object, err error)
}{
{name: "normal test", volID: "vol-001", attachID: getAttachmentName("vol-001", testDriver, nodeName)},
@ -970,31 +942,15 @@ func TestAttacherDetach(t *testing.T) {
return false, nil, nil
},
},
{
name: "API watch error happen",
volID: "vol-005",
attachID: getAttachmentName("vol-005", testDriver, nodeName),
shouldFail: true,
watcherError: true,
reactor: func(action core.Action) (handled bool, ret runtime.Object, err error) {
if action.Matches("get", "volumeattachments") {
return true, makeTestAttachment(getAttachmentName("vol-005", testDriver, nodeName), nodeName, "vol-005"), nil
}
return false, nil, nil
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Logf("running test: %v", tc.name)
fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient)
plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, fakeClient)
defer os.RemoveAll(tmpDir)
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
if tc.reactor != nil {
fakeClient.PrependReactor("*", "*", tc.reactor)
}
@ -1016,18 +972,7 @@ func TestAttacherDetach(t *testing.T) {
if err != nil {
t.Errorf("test case %s failed: %v", tc.name, err)
}
watchError := tc.watcherError
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if watchError {
errStatus := apierrors.NewInternalError(fmt.Errorf("we got an error")).Status()
fakeWatcher.Error(&errStatus)
return
}
fakeWatcher.Delete(attachment)
}()
err = csiAttacher.Detach(volumeName, types.NodeName(nodeName))
if tc.shouldFail && err == nil {
t.Fatal("expecting failure, but err = nil")
@ -1045,7 +990,6 @@ func TestAttacherDetach(t *testing.T) {
t.Errorf("expecting attachment not to be nil, but it is")
}
}
wg.Wait()
})
}
}
@ -1239,9 +1183,6 @@ func TestAttacherMountDevice(t *testing.T) {
plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir)
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
attacher, err0 := plug.NewAttacher()
if err0 != nil {
t.Fatalf("failed to create new attacher: %v", err0)
@ -1255,7 +1196,6 @@ func TestAttacherMountDevice(t *testing.T) {
nodeName := string(csiAttacher.plugin.host.GetNodeName())
attachID := getAttachmentName(tc.volName, testDriver, nodeName)
var wg sync.WaitGroup
if tc.createAttachment {
// Set up volume attachment
@ -1264,11 +1204,6 @@ func TestAttacherMountDevice(t *testing.T) {
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
wg.Add(1)
go func() {
defer wg.Done()
fakeWatcher.Delete(attachment)
}()
}
parent := filepath.Dir(tc.deviceMountPath)
@ -1359,8 +1294,6 @@ func TestAttacherMountDevice(t *testing.T) {
}
}
}
wg.Wait()
})
}
}

View File

@ -33,7 +33,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
fakeclient "k8s.io/client-go/kubernetes/fake"
@ -274,7 +273,6 @@ func TestCSI_VolumeAll(t *testing.T) {
})
client := fakeclient.NewSimpleClientset(objs...)
fakeWatcher := watch.NewRaceFreeFake()
factory := informers.NewSharedInformerFactory(client, time.Hour /* disable resync */)
csiDriverInformer := factory.Storage().V1().CSIDrivers()
@ -282,10 +280,11 @@ func TestCSI_VolumeAll(t *testing.T) {
if driverInfo != nil {
csiDriverInformer.Informer().GetStore().Add(driverInfo)
}
factory.Start(wait.NeverStop)
factory.WaitForCacheSync(wait.NeverStop)
host := volumetest.NewFakeKubeletVolumeHostWithCSINodeName(t,
attachDetachVolumeHost := volumetest.NewFakeAttachDetachVolumeHostWithCSINodeName(t,
tmpDir,
client,
ProbeVolumePlugins(),
@ -293,18 +292,18 @@ func TestCSI_VolumeAll(t *testing.T) {
csiDriverInformer.Lister(),
volumeAttachmentInformer.Lister(),
)
plugMgr := host.GetPluginMgr()
attachDetachPlugMgr := attachDetachVolumeHost.GetPluginMgr()
csiClient := setupClient(t, true)
volSpec := test.specFunc(test.specName, test.driver, test.volName)
pod := test.podFunc()
attachName := getAttachmentName(test.volName, test.driver, string(host.GetNodeName()))
attachName := getAttachmentName(test.volName, test.driver, string(attachDetachVolumeHost.GetNodeName()))
t.Log("csiTest.VolumeAll starting...")
// *************** Attach/Mount volume resources ****************//
// attach volume
t.Log("csiTest.VolumeAll Attaching volume...")
attachPlug, err := plugMgr.FindAttachablePluginBySpec(volSpec)
attachPlug, err := attachDetachPlugMgr.FindAttachablePluginBySpec(volSpec)
if err != nil {
if !test.shouldFail {
t.Fatalf("csiTest.VolumeAll PluginManager.FindAttachablePluginBySpec failed: %v", err)
@ -333,10 +332,8 @@ func TestCSI_VolumeAll(t *testing.T) {
}
// creates VolumeAttachment and blocks until it is marked attached (done by external attacher)
attachDone := make(chan struct{})
go func() {
defer close(attachDone)
attachID, err := volAttacher.Attach(volSpec, host.GetNodeName())
attachID, err := volAttacher.Attach(volSpec, attachDetachVolumeHost.GetNodeName())
if err != nil {
t.Errorf("csiTest.VolumeAll attacher.Attach failed: %s", err)
return
@ -345,8 +342,7 @@ func TestCSI_VolumeAll(t *testing.T) {
}()
// Simulates external-attacher and marks VolumeAttachment.Status.Attached = true
markVolumeAttached(t, host.GetKubeClient(), fakeWatcher, attachName, storage.VolumeAttachmentStatus{Attached: true})
<-attachDone
markVolumeAttached(t, attachDetachVolumeHost.GetKubeClient(), nil, attachName, storage.VolumeAttachmentStatus{Attached: true})
// Observe attach on this node.
devicePath, err = volAttacher.WaitForAttach(volSpec, "", pod, 500*time.Millisecond)
@ -364,9 +360,22 @@ func TestCSI_VolumeAll(t *testing.T) {
t.Log("csiTest.VolumeAll volume attacher not found, skipping attachment")
}
// The reason for separate volume hosts here is because the attach/detach behavior is exclusive to the
// CSI plugin running in the AttachDetachController. Similarly, the mount/unmount behavior is exclusive
// to the CSI plugin running in the Kubelet.
kubeletVolumeHost := volumetest.NewFakeKubeletVolumeHostWithCSINodeName(t,
tmpDir,
client,
ProbeVolumePlugins(),
"fakeNode",
csiDriverInformer.Lister(),
volumeAttachmentInformer.Lister(),
)
kubeletPlugMgr := kubeletVolumeHost.GetPluginMgr()
// Mount Device
t.Log("csiTest.VolumeAll Mouting device...")
devicePlug, err := plugMgr.FindDeviceMountablePluginBySpec(volSpec)
devicePlug, err := kubeletPlugMgr.FindDeviceMountablePluginBySpec(volSpec)
if err != nil {
t.Fatalf("csiTest.VolumeAll PluginManager.FindDeviceMountablePluginBySpec failed: %v", err)
}
@ -403,7 +412,7 @@ func TestCSI_VolumeAll(t *testing.T) {
// mount volume
t.Log("csiTest.VolumeAll Mouting volume...")
volPlug, err := plugMgr.FindPluginBySpec(volSpec)
volPlug, err := kubeletPlugMgr.FindPluginBySpec(volSpec)
if err != nil || volPlug == nil {
t.Fatalf("csiTest.VolumeAll PluginMgr.FindPluginBySpec failed: %v", err)
}
@ -499,7 +508,7 @@ func TestCSI_VolumeAll(t *testing.T) {
t.Log("csiTest.VolumeAll Tearing down...")
// unmount volume
t.Log("csiTest.VolumeAll Unmouting volume...")
volPlug, err = plugMgr.FindPluginBySpec(volSpec)
volPlug, err = kubeletPlugMgr.FindPluginBySpec(volSpec)
if err != nil || volPlug == nil {
t.Fatalf("csiTest.VolumeAll PluginMgr.FindPluginBySpec failed: %v", err)
}
@ -525,7 +534,7 @@ func TestCSI_VolumeAll(t *testing.T) {
// unmount device
t.Log("csiTest.VolumeAll Unmouting device...")
devicePlug, err = plugMgr.FindDeviceMountablePluginBySpec(volSpec)
devicePlug, err = kubeletPlugMgr.FindDeviceMountablePluginBySpec(volSpec)
if err != nil {
t.Fatalf("csiTest.VolumeAll failed to create mountable device plugin: %s", err)
}
@ -569,7 +578,7 @@ func TestCSI_VolumeAll(t *testing.T) {
// detach volume
t.Log("csiTest.VolumeAll Detaching volume...")
attachPlug, err = plugMgr.FindAttachablePluginBySpec(volSpec)
attachPlug, err = attachDetachPlugMgr.FindAttachablePluginBySpec(volSpec)
if err != nil {
t.Fatalf("csiTest.VolumeAll PluginManager.FindAttachablePluginBySpec failed: %v", err)
}
@ -594,7 +603,7 @@ func TestCSI_VolumeAll(t *testing.T) {
}
csiDetacher := getCsiAttacherFromVolumeDetacher(volDetacher)
csiDetacher.csiClient = csiClient
if err := csiDetacher.Detach(volName, host.GetNodeName()); err != nil {
if err := csiDetacher.Detach(volName, attachDetachVolumeHost.GetNodeName()); err != nil {
t.Fatal("csiTest.VolumeAll detacher.Detach failed:", err)
}
t.Log("csiTest.VolumeAll detacher.Detach succeeded for volume", volName)