Merge pull request #126961 from carlory/fix-124398

kubelet: csi plugin stop watching the volumeattachment object
This commit is contained in:
Kubernetes Prow Robot 2024-08-28 20:41:40 +01:00 committed by GitHub
commit e74205b08a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 13 additions and 189 deletions

View File

@ -32,7 +32,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
@ -54,8 +53,6 @@ type csiAttacher struct {
csiClient csiClient
}
type verifyAttachDetachStatus func(attach *storage.VolumeAttachment, volumeHandle string) (bool, error)
// volume.Attacher methods
var _ volume.Attacher = &csiAttacher{}
@ -141,38 +138,34 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
return "", nil
}
func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, timeout time.Duration) (string, error) {
// WaitForAttach waits for the attach operation to complete and returns the device path when it is done.
// But in this case, there should be no waiting. The device is found by the CSI driver later, in NodeStage / NodePublish calls.
// so it should just return device metadata, in this case it is VolumeAttachment name. If the target VolumeAttachment does not
// exist or is not attached, the function will return an error. And then the caller (kubelet) should retry it.
// We can get rid of watching it that serves no purpose. More details in https://issues.k8s.io/124398
func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, _ time.Duration) (string, error) {
source, err := getPVSourceFromSpec(spec)
if err != nil {
return "", errors.New(log("attacher.WaitForAttach failed to extract CSI volume source: %v", err))
}
volumeHandle := source.VolumeHandle
attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(c.plugin.host.GetNodeName()))
return c.waitForVolumeAttachment(source.VolumeHandle, attachID, timeout)
}
func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, timeout time.Duration) (string, error) {
klog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
defer timer.Stop()
return c.waitForVolumeAttachmentInternal(volumeHandle, attachID, timer, timeout)
}
func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) {
klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, metav1.GetOptions{})
if err != nil {
klog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
return "", fmt.Errorf("volume %v has GET error for volume attachment %v: %v", volumeHandle, attachID, err)
}
err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyAttachmentStatus)
successful, err := verifyAttachmentStatus(attach, volumeHandle)
if err != nil {
return "", err
}
if !successful {
klog.Error(log("attacher.WaitForAttach failed for volume [%s] attached (will continue to try)", volumeHandle))
return "", fmt.Errorf("volume %v is not attached for volume attachment %v", volumeHandle, attachID)
}
return attach.Name, nil
}
@ -532,62 +525,6 @@ func (c *csiAttacher) waitForVolumeAttachDetachStatusWithLister(spec *volume.Spe
}
}
func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string,
timer *time.Timer, timeout time.Duration, verifyStatus verifyAttachDetachStatus) error {
successful, err := verifyStatus(attach, volumeHandle)
if err != nil {
return err
}
if successful {
return nil
}
watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
if err != nil {
return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
}
ch := watcher.ResultChan()
defer watcher.Stop()
for {
select {
case event, ok := <-ch:
if !ok {
klog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID)
return errors.New("volume attachment watch channel had been closed")
}
switch event.Type {
case watch.Added, watch.Modified:
attach, _ := event.Object.(*storage.VolumeAttachment)
successful, err := verifyStatus(attach, volumeHandle)
if err != nil {
return err
}
if successful {
return nil
}
case watch.Deleted:
// set attach nil to get different results
// for detachment, a deleted event means successful detachment, should return success
// for attachment, should return fail
if successful, err := verifyStatus(nil, volumeHandle); !successful {
return err
}
klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] has been deleted", attachID, volumeHandle))
return nil
case watch.Error:
klog.Warningf("waitForVolumeAttachDetachInternal received watch error: %v", event)
}
case <-timer.C:
klog.Error(log("attachdetacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
return fmt.Errorf("attachdetachment timeout for volume %v", volumeHandle)
}
}
}
func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
klog.V(4).Info(log("attacher.UnmountDevice(%s)", deviceMountPath))

View File

@ -649,119 +649,6 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) {
}
}
func TestAttacherWaitForVolumeAttachment(t *testing.T) {
nodeName := "fakeNode"
testCases := []struct {
name string
initAttached bool
finalAttached bool
trigerWatchEventTime time.Duration
initAttachErr *storage.VolumeError
finalAttachErr *storage.VolumeError
timeout time.Duration
shouldFail bool
watchTimeout time.Duration
}{
{
name: "attach success at get",
initAttached: true,
timeout: 50 * time.Millisecond,
shouldFail: false,
},
{
name: "attachment error ant get",
initAttachErr: &storage.VolumeError{Message: "missing volume"},
timeout: 30 * time.Millisecond,
shouldFail: true,
},
{
name: "attach success at watch",
initAttached: false,
finalAttached: true,
trigerWatchEventTime: 5 * time.Millisecond,
timeout: 50 * time.Millisecond,
shouldFail: false,
},
{
name: "attachment error ant watch",
initAttached: false,
finalAttached: false,
finalAttachErr: &storage.VolumeError{Message: "missing volume"},
trigerWatchEventTime: 5 * time.Millisecond,
timeout: 30 * time.Millisecond,
shouldFail: true,
},
{
name: "time ran out",
initAttached: false,
finalAttached: true,
trigerWatchEventTime: 100 * time.Millisecond,
timeout: 50 * time.Millisecond,
shouldFail: true,
},
}
for i, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir)
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
attacher, err := plug.NewAttacher()
if err != nil {
t.Fatalf("failed to create new attacher: %v", err)
}
csiAttacher := getCsiAttacherFromVolumeAttacher(attacher, tc.watchTimeout)
t.Logf("running test: %v", tc.name)
pvName := fmt.Sprintf("test-pv-%d", i)
volID := fmt.Sprintf("test-vol-%d", i)
attachID := getAttachmentName(volID, testDriver, nodeName)
attachment := makeTestAttachment(attachID, nodeName, pvName)
attachment.Status.Attached = tc.initAttached
attachment.Status.AttachError = tc.initAttachErr
_, err = csiAttacher.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to attach: %v", err)
}
trigerWatchEventTime := tc.trigerWatchEventTime
finalAttached := tc.finalAttached
finalAttachErr := tc.finalAttachErr
var wg sync.WaitGroup
// after timeout, fakeWatcher will be closed by csiAttacher.waitForVolumeAttachment
if tc.trigerWatchEventTime > 0 && tc.trigerWatchEventTime < tc.timeout {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(trigerWatchEventTime)
attachment := makeTestAttachment(attachID, nodeName, pvName)
attachment.Status.Attached = finalAttached
attachment.Status.AttachError = finalAttachErr
fakeWatcher.Modify(attachment)
}()
}
retID, err := csiAttacher.waitForVolumeAttachment(volID, attachID, tc.timeout)
if tc.shouldFail && err == nil {
t.Error("expecting failure, but err is nil")
}
if tc.initAttachErr != nil && err != nil {
if tc.initAttachErr.Message != err.Error() {
t.Errorf("expecting error [%v], got [%v]", tc.initAttachErr.Message, err.Error())
}
}
if err == nil && retID != attachID {
t.Errorf("attacher.WaitForAttach not returning attachment ID")
}
wg.Wait()
})
}
}
func TestAttacherVolumesAreAttached(t *testing.T) {
type attachedSpec struct {
volName string