Merge pull request #97834 from chrishenzie/volume-attachment-informer

Update CSI attacher to fetch VolumeAttachments from lister
This commit is contained in:
Kubernetes Prow Robot
2021-02-26 13:58:15 -08:00
committed by GitHub
7 changed files with 271 additions and 184 deletions

View File

@@ -29,6 +29,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@@ -26,6 +26,7 @@ import (
"strings" "strings"
"time" "time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/klog/v2" "k8s.io/klog/v2"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@@ -34,6 +35,7 @@ import (
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
@@ -76,52 +78,56 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
node := string(nodeName) node := string(nodeName)
attachID := getAttachmentName(pvSrc.VolumeHandle, pvSrc.Driver, node) attachID := getAttachmentName(pvSrc.VolumeHandle, pvSrc.Driver, node)
var vaSrc storage.VolumeAttachmentSource attachment, err := c.plugin.volumeAttachmentLister.Get(attachID)
if spec.InlineVolumeSpecForCSIMigration { if err != nil && !apierrors.IsNotFound(err) {
// inline PV scenario - use PV spec to populate VA source. return "", errors.New(log("failed to get volume attachment from lister: %v", err))
// The volume spec will be populated by CSI translation API }
// for inline volumes. This allows fields required by the CSI
// attacher such as AccessMode and MountOptions (in addition to if attachment == nil {
// fields in the CSI persistent volume source) to be populated var vaSrc storage.VolumeAttachmentSource
// as part of CSI translation for inline volumes. if spec.InlineVolumeSpecForCSIMigration {
vaSrc = storage.VolumeAttachmentSource{ // inline PV scenario - use PV spec to populate VA source.
InlineVolumeSpec: &spec.PersistentVolume.Spec, // The volume spec will be populated by CSI translation API
// for inline volumes. This allows fields required by the CSI
// attacher such as AccessMode and MountOptions (in addition to
// fields in the CSI persistent volume source) to be populated
// as part of CSI translation for inline volumes.
vaSrc = storage.VolumeAttachmentSource{
InlineVolumeSpec: &spec.PersistentVolume.Spec,
}
} else {
// regular PV scenario - use PV name to populate VA source
pvName := spec.PersistentVolume.GetName()
vaSrc = storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
}
} }
} else {
// regular PV scenario - use PV name to populate VA source attachment := &storage.VolumeAttachment{
pvName := spec.PersistentVolume.GetName() ObjectMeta: meta.ObjectMeta{
vaSrc = storage.VolumeAttachmentSource{ Name: attachID,
PersistentVolumeName: &pvName, },
Spec: storage.VolumeAttachmentSpec{
NodeName: node,
Attacher: pvSrc.Driver,
Source: vaSrc,
},
}
_, err = c.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return "", errors.New(log("attacher.Attach failed: %v", err))
}
klog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, pvSrc.VolumeHandle))
} else {
klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle))
} }
} }
attachment := &storage.VolumeAttachment{ // Attach and detach functionality is exclusive to the CSI plugin that runs in the AttachDetachController,
ObjectMeta: meta.ObjectMeta{ // and has access to a VolumeAttachment lister that can be polled for the current status.
Name: attachID, if err := c.waitForVolumeAttachmentWithLister(pvSrc.VolumeHandle, attachID, c.watchTimeout); err != nil {
},
Spec: storage.VolumeAttachmentSpec{
NodeName: node,
Attacher: pvSrc.Driver,
Source: vaSrc,
},
}
_, err = c.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
alreadyExist := false
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return "", errors.New(log("attacher.Attach failed: %v", err))
}
alreadyExist = true
}
if alreadyExist {
klog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, pvSrc.VolumeHandle))
} else {
klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle))
}
if _, err := c.waitForVolumeAttachment(pvSrc.VolumeHandle, attachID, c.watchTimeout); err != nil {
return "", err return "", err
} }
@@ -166,6 +172,32 @@ func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID str
return attach.Name, nil return attach.Name, nil
} }
func (c *csiAttacher) waitForVolumeAttachmentWithLister(volumeHandle, attachID string, timeout time.Duration) error {
klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
verifyStatus := func() (bool, error) {
volumeAttachment, err := c.plugin.volumeAttachmentLister.Get(attachID)
if err != nil {
// Ignore "not found" errors in case the VolumeAttachment was just created and hasn't yet made it into the lister.
if !apierrors.IsNotFound(err) {
klog.Error(log("unexpected error waiting for volume attachment, %v", err))
return false, err
}
// The VolumeAttachment is not available yet and we will have to try again.
return false, nil
}
successful, err := verifyAttachmentStatus(volumeAttachment, volumeHandle)
if err != nil {
return false, err
}
return successful, nil
}
return c.waitForVolumeAttachDetachStatusWithLister(volumeHandle, attachID, timeout, verifyStatus, "Attach")
}
func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
klog.V(4).Info(log("probing attachment status for %d volume(s) ", len(specs))) klog.V(4).Info(log("probing attachment status for %d volume(s) ", len(specs)))
@@ -399,36 +431,70 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
} }
klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID)) klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
err := c.waitForVolumeDetachment(volID, attachID, c.watchTimeout)
return err // Attach and detach functionality is exclusive to the CSI plugin that runs in the AttachDetachController,
// and has access to a VolumeAttachment lister that can be polled for the current status.
return c.waitForVolumeDetachmentWithLister(volID, attachID, c.watchTimeout)
} }
func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string, timeout time.Duration) error { func (c *csiAttacher) waitForVolumeDetachmentWithLister(volumeHandle, attachID string, timeout time.Duration) error {
klog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
defer timer.Stop()
return c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout)
}
func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID string, timer *time.Timer,
timeout time.Duration) error {
klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID)) klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
if err != nil { verifyStatus := func() (bool, error) {
if apierrors.IsNotFound(err) { volumeAttachment, err := c.plugin.volumeAttachmentLister.Get(attachID)
//object deleted or never existed, done if err != nil {
if !apierrors.IsNotFound(err) {
return false, errors.New(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
}
// Detachment successful.
klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle)) klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle))
return nil return true, nil
} }
return errors.New(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
// Detachment is only "successful" once the VolumeAttachment is deleted, however we perform
// this check to make sure the object does not contain any detach errors.
successful, err := verifyDetachmentStatus(volumeAttachment, volumeHandle)
if err != nil {
return false, err
}
return successful, nil
} }
err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyDetachmentStatus)
if err != nil { return c.waitForVolumeAttachDetachStatusWithLister(volumeHandle, attachID, timeout, verifyStatus, "Detach")
return err }
func (c *csiAttacher) waitForVolumeAttachDetachStatusWithLister(volumeHandle, attachID string, timeout time.Duration, verifyStatus func() (bool, error), operation string) error {
var (
initBackoff = 500 * time.Millisecond
// This is approximately the duration between consecutive ticks after two minutes (CSI timeout).
maxBackoff = 7 * time.Second
resetDuration = time.Minute
backoffFactor = 1.05
jitter = 0.1
clock = &clock.RealClock{}
)
backoffMgr := wait.NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration, backoffFactor, jitter, clock)
defer backoffMgr.Backoff().Stop()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
select {
case <-backoffMgr.Backoff().C():
successful, err := verifyStatus()
if err != nil {
return err
}
if successful {
return nil
}
case <-ctx.Done():
klog.Error(log("%s timeout after %v [volume=%v; attachment.ID=%v]", operation, timeout, volumeHandle, attachID))
return fmt.Errorf("%s timeout for volume %v", operation, volumeHandle)
}
} }
return err
} }
func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string, func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string,

View File

@@ -199,12 +199,9 @@ func TestAttacherAttach(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
t.Logf("test case: %s", tc.name) t.Logf("test case: %s", tc.name)
fakeClient := fakeclient.NewSimpleClientset() fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient) plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, fakeClient)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
attacher, err := plug.NewAttacher() attacher, err := plug.NewAttacher()
if err != nil { if err != nil {
t.Fatalf("failed to create new attacher: %v", err) t.Fatalf("failed to create new attacher: %v", err)
@@ -234,12 +231,10 @@ func TestAttacherAttach(t *testing.T) {
status.AttachError = &storage.VolumeError{ status.AttachError = &storage.VolumeError{
Message: "attacher error", Message: "attacher error",
} }
errStatus := apierrors.NewInternalError(fmt.Errorf("we got an error")).Status()
fakeWatcher.Error(&errStatus)
} else { } else {
status.Attached = true status.Attached = true
} }
markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, tc.attachID, status) markVolumeAttached(t, csiAttacher.k8s, nil, tc.attachID, status)
wg.Wait() wg.Wait()
}) })
} }
@@ -291,12 +286,9 @@ func TestAttacherAttachWithInline(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
t.Logf("test case: %s", tc.name) t.Logf("test case: %s", tc.name)
fakeClient := fakeclient.NewSimpleClientset() fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient) plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, fakeClient)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
attacher, err := plug.NewAttacher() attacher, err := plug.NewAttacher()
if err != nil { if err != nil {
t.Fatalf("failed to create new attacher: %v", err) t.Fatalf("failed to create new attacher: %v", err)
@@ -325,7 +317,7 @@ func TestAttacherAttachWithInline(t *testing.T) {
} else { } else {
status.Attached = true status.Attached = true
} }
markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, tc.attachID, status) markVolumeAttached(t, csiAttacher.k8s, nil, tc.attachID, status)
wg.Wait() wg.Wait()
}) })
} }
@@ -366,25 +358,9 @@ func TestAttacherWithCSIDriver(t *testing.T) {
getTestCSIDriver("attachable", nil, &bTrue, nil), getTestCSIDriver("attachable", nil, &bTrue, nil),
getTestCSIDriver("nil", nil, nil, nil), getTestCSIDriver("nil", nil, nil, nil),
) )
plug, tmpDir := newTestPlugin(t, fakeClient) plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, fakeClient)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
attachmentWatchCreated := make(chan core.Action)
// Make sure this is the first reactor
fakeClient.Fake.PrependWatchReactor("volumeattachments", func(action core.Action) (bool, watch.Interface, error) {
select {
case <-attachmentWatchCreated:
// already closed
default:
// The attacher is already watching the attachment, notify the test goroutine to
// update the status of attachment.
// TODO: In theory this still has a race condition, because the actual watch is created by
// the next reactor in the chain and we unblock the test goroutine before returning here.
close(attachmentWatchCreated)
}
return false, nil, nil
})
attacher, err := plug.NewAttacher() attacher, err := plug.NewAttacher()
if err != nil { if err != nil {
t.Fatalf("failed to create new attacher: %v", err) t.Fatalf("failed to create new attacher: %v", err)
@@ -423,9 +399,6 @@ func TestAttacherWithCSIDriver(t *testing.T) {
status := storage.VolumeAttachmentStatus{ status := storage.VolumeAttachmentStatus{
Attached: true, Attached: true,
} }
// We want to ensure the watcher, which is created in csiAttacher,
// has been started before updating the status of attachment.
<-attachmentWatchCreated
markVolumeAttached(t, csiAttacher.k8s, nil, expectedAttachID, status) markVolumeAttached(t, csiAttacher.k8s, nil, expectedAttachID, status)
} }
wg.Wait() wg.Wait()
@@ -827,7 +800,7 @@ func TestAttacherVolumesAreAttached(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil) plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, nil)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
attacher, err := plug.NewAttacher() attacher, err := plug.NewAttacher()
@@ -947,12 +920,11 @@ func TestAttacherVolumesAreAttachedWithInline(t *testing.T) {
func TestAttacherDetach(t *testing.T) { func TestAttacherDetach(t *testing.T) {
nodeName := "fakeNode" nodeName := "fakeNode"
testCases := []struct { testCases := []struct {
name string name string
volID string volID string
attachID string attachID string
shouldFail bool shouldFail bool
watcherError bool reactor func(action core.Action) (handled bool, ret runtime.Object, err error)
reactor func(action core.Action) (handled bool, ret runtime.Object, err error)
}{ }{
{name: "normal test", volID: "vol-001", attachID: getAttachmentName("vol-001", testDriver, nodeName)}, {name: "normal test", volID: "vol-001", attachID: getAttachmentName("vol-001", testDriver, nodeName)},
{name: "normal test 2", volID: "vol-002", attachID: getAttachmentName("vol-002", testDriver, nodeName)}, {name: "normal test 2", volID: "vol-002", attachID: getAttachmentName("vol-002", testDriver, nodeName)},
@@ -970,31 +942,15 @@ func TestAttacherDetach(t *testing.T) {
return false, nil, nil return false, nil, nil
}, },
}, },
{
name: "API watch error happen",
volID: "vol-005",
attachID: getAttachmentName("vol-005", testDriver, nodeName),
shouldFail: true,
watcherError: true,
reactor: func(action core.Action) (handled bool, ret runtime.Object, err error) {
if action.Matches("get", "volumeattachments") {
return true, makeTestAttachment(getAttachmentName("vol-005", testDriver, nodeName), nodeName, "vol-005"), nil
}
return false, nil, nil
},
},
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
t.Logf("running test: %v", tc.name) t.Logf("running test: %v", tc.name)
fakeClient := fakeclient.NewSimpleClientset() fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient) plug, tmpDir := newTestPluginWithAttachDetachVolumeHost(t, fakeClient)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
if tc.reactor != nil { if tc.reactor != nil {
fakeClient.PrependReactor("*", "*", tc.reactor) fakeClient.PrependReactor("*", "*", tc.reactor)
} }
@@ -1016,18 +972,7 @@ func TestAttacherDetach(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("test case %s failed: %v", tc.name, err) t.Errorf("test case %s failed: %v", tc.name, err)
} }
watchError := tc.watcherError
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if watchError {
errStatus := apierrors.NewInternalError(fmt.Errorf("we got an error")).Status()
fakeWatcher.Error(&errStatus)
return
}
fakeWatcher.Delete(attachment)
}()
err = csiAttacher.Detach(volumeName, types.NodeName(nodeName)) err = csiAttacher.Detach(volumeName, types.NodeName(nodeName))
if tc.shouldFail && err == nil { if tc.shouldFail && err == nil {
t.Fatal("expecting failure, but err = nil") t.Fatal("expecting failure, but err = nil")
@@ -1045,7 +990,6 @@ func TestAttacherDetach(t *testing.T) {
t.Errorf("expecting attachment not to be nil, but it is") t.Errorf("expecting attachment not to be nil, but it is")
} }
} }
wg.Wait()
}) })
} }
} }
@@ -1239,9 +1183,6 @@ func TestAttacherMountDevice(t *testing.T) {
plug, tmpDir := newTestPlugin(t, fakeClient) plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
attacher, err0 := plug.NewAttacher() attacher, err0 := plug.NewAttacher()
if err0 != nil { if err0 != nil {
t.Fatalf("failed to create new attacher: %v", err0) t.Fatalf("failed to create new attacher: %v", err0)
@@ -1255,7 +1196,6 @@ func TestAttacherMountDevice(t *testing.T) {
nodeName := string(csiAttacher.plugin.host.GetNodeName()) nodeName := string(csiAttacher.plugin.host.GetNodeName())
attachID := getAttachmentName(tc.volName, testDriver, nodeName) attachID := getAttachmentName(tc.volName, testDriver, nodeName)
var wg sync.WaitGroup
if tc.createAttachment { if tc.createAttachment {
// Set up volume attachment // Set up volume attachment
@@ -1264,11 +1204,6 @@ func TestAttacherMountDevice(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("failed to attach: %v", err) t.Fatalf("failed to attach: %v", err)
} }
wg.Add(1)
go func() {
defer wg.Done()
fakeWatcher.Delete(attachment)
}()
} }
parent := filepath.Dir(tc.deviceMountPath) parent := filepath.Dir(tc.deviceMountPath)
@@ -1359,8 +1294,6 @@ func TestAttacherMountDevice(t *testing.T) {
} }
} }
} }
wg.Wait()
}) })
} }
} }

View File

@@ -41,8 +41,22 @@ import (
volumetest "k8s.io/kubernetes/pkg/volume/testing" volumetest "k8s.io/kubernetes/pkg/volume/testing"
) )
// create a plugin mgr to load plugins and setup a fake client const (
volumeHostType int = iota
kubeletVolumeHostType
attachDetachVolumeHostType
)
func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, string) { func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, string) {
return newTestPluginWithVolumeHost(t, client, kubeletVolumeHostType)
}
func newTestPluginWithAttachDetachVolumeHost(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, string) {
return newTestPluginWithVolumeHost(t, client, attachDetachVolumeHostType)
}
// create a plugin mgr to load plugins and setup a fake client
func newTestPluginWithVolumeHost(t *testing.T, client *fakeclient.Clientset, hostType int) (*csiPlugin, string) {
tmpDir, err := utiltesting.MkTmpdir("csi-test") tmpDir, err := utiltesting.MkTmpdir("csi-test")
if err != nil { if err != nil {
t.Fatalf("can't create temp dir: %v", err) t.Fatalf("can't create temp dir: %v", err)
@@ -77,16 +91,45 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
} }
} }
host := volumetest.NewFakeKubeletVolumeHostWithCSINodeName(t, var host volume.VolumeHost
tmpDir, switch hostType {
client, case volumeHostType:
ProbeVolumePlugins(), host = volumetest.NewFakeVolumeHostWithCSINodeName(t,
"fakeNode", tmpDir,
csiDriverLister, client,
volumeAttachmentLister, ProbeVolumePlugins(),
) "fakeNode",
csiDriverLister,
nil,
)
case kubeletVolumeHostType:
host = volumetest.NewFakeKubeletVolumeHostWithCSINodeName(t,
tmpDir,
client,
ProbeVolumePlugins(),
"fakeNode",
csiDriverLister,
volumeAttachmentLister,
)
case attachDetachVolumeHostType:
host = volumetest.NewFakeAttachDetachVolumeHostWithCSINodeName(t,
tmpDir,
client,
ProbeVolumePlugins(),
"fakeNode",
csiDriverLister,
volumeAttachmentLister,
)
default:
t.Fatalf("Unsupported volume host type")
}
pluginMgr := host.GetPluginMgr() fakeHost, ok := host.(volumetest.FakeVolumeHost)
if !ok {
t.Fatalf("Unsupported volume host type")
}
pluginMgr := fakeHost.GetPluginMgr()
plug, err := pluginMgr.FindPluginByName(CSIPluginName) plug, err := pluginMgr.FindPluginByName(CSIPluginName)
if err != nil { if err != nil {
t.Fatalf("can't find plugin %v", CSIPluginName) t.Fatalf("can't find plugin %v", CSIPluginName)

View File

@@ -33,7 +33,6 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
fakeclient "k8s.io/client-go/kubernetes/fake" fakeclient "k8s.io/client-go/kubernetes/fake"
@@ -274,7 +273,6 @@ func TestCSI_VolumeAll(t *testing.T) {
}) })
client := fakeclient.NewSimpleClientset(objs...) client := fakeclient.NewSimpleClientset(objs...)
fakeWatcher := watch.NewRaceFreeFake()
factory := informers.NewSharedInformerFactory(client, time.Hour /* disable resync */) factory := informers.NewSharedInformerFactory(client, time.Hour /* disable resync */)
csiDriverInformer := factory.Storage().V1().CSIDrivers() csiDriverInformer := factory.Storage().V1().CSIDrivers()
@@ -282,10 +280,11 @@ func TestCSI_VolumeAll(t *testing.T) {
if driverInfo != nil { if driverInfo != nil {
csiDriverInformer.Informer().GetStore().Add(driverInfo) csiDriverInformer.Informer().GetStore().Add(driverInfo)
} }
factory.Start(wait.NeverStop) factory.Start(wait.NeverStop)
factory.WaitForCacheSync(wait.NeverStop) factory.WaitForCacheSync(wait.NeverStop)
host := volumetest.NewFakeKubeletVolumeHostWithCSINodeName(t, attachDetachVolumeHost := volumetest.NewFakeAttachDetachVolumeHostWithCSINodeName(t,
tmpDir, tmpDir,
client, client,
ProbeVolumePlugins(), ProbeVolumePlugins(),
@@ -293,18 +292,18 @@ func TestCSI_VolumeAll(t *testing.T) {
csiDriverInformer.Lister(), csiDriverInformer.Lister(),
volumeAttachmentInformer.Lister(), volumeAttachmentInformer.Lister(),
) )
plugMgr := host.GetPluginMgr() attachDetachPlugMgr := attachDetachVolumeHost.GetPluginMgr()
csiClient := setupClient(t, true) csiClient := setupClient(t, true)
volSpec := test.specFunc(test.specName, test.driver, test.volName) volSpec := test.specFunc(test.specName, test.driver, test.volName)
pod := test.podFunc() pod := test.podFunc()
attachName := getAttachmentName(test.volName, test.driver, string(host.GetNodeName())) attachName := getAttachmentName(test.volName, test.driver, string(attachDetachVolumeHost.GetNodeName()))
t.Log("csiTest.VolumeAll starting...") t.Log("csiTest.VolumeAll starting...")
// *************** Attach/Mount volume resources ****************// // *************** Attach/Mount volume resources ****************//
// attach volume // attach volume
t.Log("csiTest.VolumeAll Attaching volume...") t.Log("csiTest.VolumeAll Attaching volume...")
attachPlug, err := plugMgr.FindAttachablePluginBySpec(volSpec) attachPlug, err := attachDetachPlugMgr.FindAttachablePluginBySpec(volSpec)
if err != nil { if err != nil {
if !test.shouldFail { if !test.shouldFail {
t.Fatalf("csiTest.VolumeAll PluginManager.FindAttachablePluginBySpec failed: %v", err) t.Fatalf("csiTest.VolumeAll PluginManager.FindAttachablePluginBySpec failed: %v", err)
@@ -333,10 +332,8 @@ func TestCSI_VolumeAll(t *testing.T) {
} }
// creates VolumeAttachment and blocks until it is marked attached (done by external attacher) // creates VolumeAttachment and blocks until it is marked attached (done by external attacher)
attachDone := make(chan struct{})
go func() { go func() {
defer close(attachDone) attachID, err := volAttacher.Attach(volSpec, attachDetachVolumeHost.GetNodeName())
attachID, err := volAttacher.Attach(volSpec, host.GetNodeName())
if err != nil { if err != nil {
t.Errorf("csiTest.VolumeAll attacher.Attach failed: %s", err) t.Errorf("csiTest.VolumeAll attacher.Attach failed: %s", err)
return return
@@ -345,8 +342,7 @@ func TestCSI_VolumeAll(t *testing.T) {
}() }()
// Simulates external-attacher and marks VolumeAttachment.Status.Attached = true // Simulates external-attacher and marks VolumeAttachment.Status.Attached = true
markVolumeAttached(t, host.GetKubeClient(), fakeWatcher, attachName, storage.VolumeAttachmentStatus{Attached: true}) markVolumeAttached(t, attachDetachVolumeHost.GetKubeClient(), nil, attachName, storage.VolumeAttachmentStatus{Attached: true})
<-attachDone
// Observe attach on this node. // Observe attach on this node.
devicePath, err = volAttacher.WaitForAttach(volSpec, "", pod, 500*time.Millisecond) devicePath, err = volAttacher.WaitForAttach(volSpec, "", pod, 500*time.Millisecond)
@@ -364,9 +360,22 @@ func TestCSI_VolumeAll(t *testing.T) {
t.Log("csiTest.VolumeAll volume attacher not found, skipping attachment") t.Log("csiTest.VolumeAll volume attacher not found, skipping attachment")
} }
// The reason for separate volume hosts here is because the attach/detach behavior is exclusive to the
// CSI plugin running in the AttachDetachController. Similarly, the mount/unmount behavior is exclusive
// to the CSI plugin running in the Kubelet.
kubeletVolumeHost := volumetest.NewFakeKubeletVolumeHostWithCSINodeName(t,
tmpDir,
client,
ProbeVolumePlugins(),
"fakeNode",
csiDriverInformer.Lister(),
volumeAttachmentInformer.Lister(),
)
kubeletPlugMgr := kubeletVolumeHost.GetPluginMgr()
// Mount Device // Mount Device
t.Log("csiTest.VolumeAll Mouting device...") t.Log("csiTest.VolumeAll Mouting device...")
devicePlug, err := plugMgr.FindDeviceMountablePluginBySpec(volSpec) devicePlug, err := kubeletPlugMgr.FindDeviceMountablePluginBySpec(volSpec)
if err != nil { if err != nil {
t.Fatalf("csiTest.VolumeAll PluginManager.FindDeviceMountablePluginBySpec failed: %v", err) t.Fatalf("csiTest.VolumeAll PluginManager.FindDeviceMountablePluginBySpec failed: %v", err)
} }
@@ -403,7 +412,7 @@ func TestCSI_VolumeAll(t *testing.T) {
// mount volume // mount volume
t.Log("csiTest.VolumeAll Mouting volume...") t.Log("csiTest.VolumeAll Mouting volume...")
volPlug, err := plugMgr.FindPluginBySpec(volSpec) volPlug, err := kubeletPlugMgr.FindPluginBySpec(volSpec)
if err != nil || volPlug == nil { if err != nil || volPlug == nil {
t.Fatalf("csiTest.VolumeAll PluginMgr.FindPluginBySpec failed: %v", err) t.Fatalf("csiTest.VolumeAll PluginMgr.FindPluginBySpec failed: %v", err)
} }
@@ -499,7 +508,7 @@ func TestCSI_VolumeAll(t *testing.T) {
t.Log("csiTest.VolumeAll Tearing down...") t.Log("csiTest.VolumeAll Tearing down...")
// unmount volume // unmount volume
t.Log("csiTest.VolumeAll Unmouting volume...") t.Log("csiTest.VolumeAll Unmouting volume...")
volPlug, err = plugMgr.FindPluginBySpec(volSpec) volPlug, err = kubeletPlugMgr.FindPluginBySpec(volSpec)
if err != nil || volPlug == nil { if err != nil || volPlug == nil {
t.Fatalf("csiTest.VolumeAll PluginMgr.FindPluginBySpec failed: %v", err) t.Fatalf("csiTest.VolumeAll PluginMgr.FindPluginBySpec failed: %v", err)
} }
@@ -525,7 +534,7 @@ func TestCSI_VolumeAll(t *testing.T) {
// unmount device // unmount device
t.Log("csiTest.VolumeAll Unmouting device...") t.Log("csiTest.VolumeAll Unmouting device...")
devicePlug, err = plugMgr.FindDeviceMountablePluginBySpec(volSpec) devicePlug, err = kubeletPlugMgr.FindDeviceMountablePluginBySpec(volSpec)
if err != nil { if err != nil {
t.Fatalf("csiTest.VolumeAll failed to create mountable device plugin: %s", err) t.Fatalf("csiTest.VolumeAll failed to create mountable device plugin: %s", err)
} }
@@ -569,7 +578,7 @@ func TestCSI_VolumeAll(t *testing.T) {
// detach volume // detach volume
t.Log("csiTest.VolumeAll Detaching volume...") t.Log("csiTest.VolumeAll Detaching volume...")
attachPlug, err = plugMgr.FindAttachablePluginBySpec(volSpec) attachPlug, err = attachDetachPlugMgr.FindAttachablePluginBySpec(volSpec)
if err != nil { if err != nil {
t.Fatalf("csiTest.VolumeAll PluginManager.FindAttachablePluginBySpec failed: %v", err) t.Fatalf("csiTest.VolumeAll PluginManager.FindAttachablePluginBySpec failed: %v", err)
} }
@@ -594,7 +603,7 @@ func TestCSI_VolumeAll(t *testing.T) {
} }
csiDetacher := getCsiAttacherFromVolumeDetacher(volDetacher) csiDetacher := getCsiAttacherFromVolumeDetacher(volDetacher)
csiDetacher.csiClient = csiClient csiDetacher.csiClient = csiClient
if err := csiDetacher.Detach(volName, host.GetNodeName()); err != nil { if err := csiDetacher.Detach(volName, attachDetachVolumeHost.GetNodeName()); err != nil {
t.Fatal("csiTest.VolumeAll detacher.Detach failed:", err) t.Fatal("csiTest.VolumeAll detacher.Detach failed:", err)
} }
t.Log("csiTest.VolumeAll detacher.Detach succeeded for volume", volName) t.Log("csiTest.VolumeAll detacher.Detach succeeded for volume", volName)

View File

@@ -1618,7 +1618,7 @@ func GetTestVolumePluginMgr(t *testing.T) (*VolumePluginMgr, *FakeVolumePlugin)
nil, /* kubeClient */ nil, /* kubeClient */
plugins, /* plugins */ plugins, /* plugins */
) )
return v.pluginMgr, plugins[0].(*FakeVolumePlugin) return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
} }
func GetTestKubeletVolumePluginMgr(t *testing.T) (*VolumePluginMgr, *FakeVolumePlugin) { func GetTestKubeletVolumePluginMgr(t *testing.T) (*VolumePluginMgr, *FakeVolumePlugin) {
@@ -1629,7 +1629,7 @@ func GetTestKubeletVolumePluginMgr(t *testing.T) (*VolumePluginMgr, *FakeVolumeP
nil, /* kubeClient */ nil, /* kubeClient */
plugins, /* plugins */ plugins, /* plugins */
) )
return v.pluginMgr, plugins[0].(*FakeVolumePlugin) return v.GetPluginMgr(), plugins[0].(*FakeVolumePlugin)
} }
// CreateTestPVC returns a provisionable PVC for tests // CreateTestPVC returns a provisionable PVC for tests

View File

@@ -45,6 +45,12 @@ import (
testingexec "k8s.io/utils/exec/testing" testingexec "k8s.io/utils/exec/testing"
) )
type FakeVolumeHost interface {
VolumeHost
GetPluginMgr() *VolumePluginMgr
}
// fakeVolumeHost is useful for testing volume plugins. // fakeVolumeHost is useful for testing volume plugins.
// TODO: Extract fields specific to fakeKubeletVolumeHost and fakeAttachDetachVolumeHost. // TODO: Extract fields specific to fakeKubeletVolumeHost and fakeAttachDetachVolumeHost.
type fakeVolumeHost struct { type fakeVolumeHost struct {
@@ -67,20 +73,21 @@ type fakeVolumeHost struct {
} }
var _ VolumeHost = &fakeVolumeHost{} var _ VolumeHost = &fakeVolumeHost{}
var _ FakeVolumeHost = &fakeVolumeHost{}
func NewFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost { func NewFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) FakeVolumeHost {
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil) return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
} }
func NewFakeVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeVolumeHost { func NewFakeVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) FakeVolumeHost {
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil) return newFakeVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil)
} }
func NewFakeVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeVolumeHost { func NewFakeVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister) return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
} }
func newFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeVolumeHost { func newFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud, nodeName: nodeName, csiDriverLister: driverLister, volumeAttachmentLister: volumeAttachLister} host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud, nodeName: nodeName, csiDriverLister: driverLister, volumeAttachmentLister: volumeAttachLister}
host.mounter = mount.NewFakeMounter(nil) host.mounter = mount.NewFakeMounter(nil)
host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap) host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap)
@@ -244,8 +251,35 @@ type fakeAttachDetachVolumeHost struct {
} }
var _ AttachDetachVolumeHost = &fakeAttachDetachVolumeHost{} var _ AttachDetachVolumeHost = &fakeAttachDetachVolumeHost{}
var _ FakeVolumeHost = &fakeAttachDetachVolumeHost{}
// TODO: Create constructors for AttachDetachVolumeHost once it's consumed in tests. func NewFakeAttachDetachVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
return newFakeAttachDetachVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
}
func newFakeAttachDetachVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
host := &fakeAttachDetachVolumeHost{}
host.rootDir = rootDir
host.kubeClient = kubeClient
host.cloud = cloud
host.nodeName = nodeName
host.csiDriverLister = driverLister
host.volumeAttachmentLister = volumeAttachLister
host.mounter = mount.NewFakeMounter(nil)
host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap)
host.exec = &testingexec.FakeExec{DisableScripts: true}
host.pluginMgr = &VolumePluginMgr{}
if err := host.pluginMgr.InitPlugins(plugins, nil /* prober */, host); err != nil {
t.Fatalf("Failed to init plugins while creating fake volume host: %v", err)
}
host.subpather = &subpath.FakeSubpath{}
host.informerFactory = informers.NewSharedInformerFactory(kubeClient, time.Minute)
// Wait until the InitPlugins setup is finished before returning from this setup func
if err := host.WaitForKubeletErrNil(); err != nil {
t.Fatalf("Failed to wait for kubelet err to be nil while creating fake volume host: %v", err)
}
return host
}
func (f *fakeAttachDetachVolumeHost) CSINodeLister() storagelistersv1.CSINodeLister { func (f *fakeAttachDetachVolumeHost) CSINodeLister() storagelistersv1.CSINodeLister {
// not needed for testing // not needed for testing
@@ -269,26 +303,27 @@ type fakeKubeletVolumeHost struct {
} }
var _ KubeletVolumeHost = &fakeKubeletVolumeHost{} var _ KubeletVolumeHost = &fakeKubeletVolumeHost{}
var _ FakeVolumeHost = &fakeKubeletVolumeHost{}
func NewFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeKubeletVolumeHost { func NewFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) FakeVolumeHost {
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil) return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
} }
func NewFakeKubeletVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeKubeletVolumeHost { func NewFakeKubeletVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) FakeVolumeHost {
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil) return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil)
} }
func NewFakeKubeletVolumeHostWithNodeLabels(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) *fakeKubeletVolumeHost { func NewFakeKubeletVolumeHostWithNodeLabels(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) FakeVolumeHost {
volHost := newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil) volHost := newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
volHost.nodeLabels = labels volHost.nodeLabels = labels
return volHost return volHost
} }
func NewFakeKubeletVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeKubeletVolumeHost { func NewFakeKubeletVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister) return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
} }
func NewFakeKubeletVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) *fakeKubeletVolumeHost { func NewFakeKubeletVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) FakeVolumeHost {
return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil, nil) return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil, nil)
} }