diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 95ab8e29f1a..149ef2d7c05 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -26,9 +26,11 @@ import ( "testing" "time" + v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -371,7 +373,7 @@ func TestAttacherWithCSIDriver(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func(volSpec *volume.Spec, expectAttach bool) { - attachID, err := csiAttacher.Attach(volSpec, types.NodeName("node")) + attachID, err := csiAttacher.Attach(volSpec, types.NodeName("fakeNode")) defer wg.Done() if err != nil { @@ -383,7 +385,7 @@ func TestAttacherWithCSIDriver(t *testing.T) { }(spec, test.expectVolumeAttachment) if test.expectVolumeAttachment { - expectedAttachID := getAttachmentName("test-vol", test.driver, "node") + expectedAttachID := getAttachmentName("test-vol", test.driver, "fakeNode") status := storage.VolumeAttachmentStatus{ Attached: true, } @@ -433,6 +435,12 @@ func TestAttacherWaitForVolumeAttachmentWithCSIDriver(t *testing.T) { getTestCSIDriver("not-attachable", nil, &bFalse, nil), getTestCSIDriver("attachable", nil, &bTrue, nil), getTestCSIDriver("nil", nil, nil, nil), + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fakeNode", + }, + Spec: v1.NodeSpec{}, + }, ) plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) @@ -478,21 +486,21 @@ func TestAttacherWaitForAttach(t *testing.T) { driver: "attachable", makeAttachment: func() *storage.VolumeAttachment { - testAttachID := getAttachmentName("test-vol", "attachable", "node") - successfulAttachment := makeTestAttachment(testAttachID, "node", "test-pv") + testAttachID := getAttachmentName("test-vol", "attachable", "fakeNode") + successfulAttachment := makeTestAttachment(testAttachID, "fakeNode", "test-pv") successfulAttachment.Status.Attached = true return successfulAttachment }, spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "attachable", "test-vol"), false), - expectedAttachID: getAttachmentName("test-vol", "attachable", "node"), + expectedAttachID: getAttachmentName("test-vol", "attachable", "fakeNode"), expectError: false, }, { name: "failed attach with vol source", makeAttachment: func() *storage.VolumeAttachment { - testAttachID := getAttachmentName("test-vol", "attachable", "node") - successfulAttachment := makeTestAttachment(testAttachID, "node", "volSrc01") + testAttachID := getAttachmentName("test-vol", "attachable", "fakeNode") + successfulAttachment := makeTestAttachment(testAttachID, "fakeNode", "volSrc01") successfulAttachment.Status.Attached = true return successfulAttachment }, @@ -559,21 +567,21 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) { name: "successful attach with PV", makeAttachment: func() *storage.VolumeAttachment { - testAttachID := getAttachmentName("test-vol", "attachable", "node") - successfulAttachment := makeTestAttachment(testAttachID, "node", "test-pv") + testAttachID := getAttachmentName("test-vol", "attachable", "fakeNode") + successfulAttachment := makeTestAttachment(testAttachID, "fakeNode", "test-pv") successfulAttachment.Status.Attached = true return successfulAttachment }, spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "attachable", "test-vol"), false), - expectedAttachID: getAttachmentName("test-vol", "attachable", "node"), + expectedAttachID: getAttachmentName("test-vol", "attachable", "fakeNode"), expectError: false, }, { name: "failed attach with volSrc", makeAttachment: func() *storage.VolumeAttachment { - testAttachID := getAttachmentName("test-vol", "attachable", "node") - successfulAttachment := makeTestAttachment(testAttachID, "node", "volSrc01") + testAttachID := getAttachmentName("test-vol", "attachable", "fakeNode") + successfulAttachment := makeTestAttachment(testAttachID, "fakeNode", "volSrc01") successfulAttachment.Status.Attached = true return successfulAttachment }, @@ -625,7 +633,7 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) { } func TestAttacherWaitForVolumeAttachment(t *testing.T) { - nodeName := "test-node" + nodeName := "fakeNode" testCases := []struct { name string initAttached bool @@ -781,7 +789,7 @@ func TestAttacherVolumesAreAttached(t *testing.T) { t.Fatalf("failed to create new attacher: %v", err) } csiAttacher := attacher.(*csiAttacher) - nodeName := "test-node" + nodeName := "fakeNode" var specs []*volume.Spec // create and save volume attchments @@ -852,7 +860,7 @@ func TestAttacherVolumesAreAttachedWithInline(t *testing.T) { t.Fatalf("failed to create new attacher: %v", err) } csiAttacher := attacher.(*csiAttacher) - nodeName := "test-node" + nodeName := "fakeNode" var specs []*volume.Spec // create and save volume attchments @@ -891,8 +899,7 @@ func TestAttacherVolumesAreAttachedWithInline(t *testing.T) { } func TestAttacherDetach(t *testing.T) { - - nodeName := "test-node" + nodeName := "fakeNode" testCases := []struct { name string volID string @@ -1492,6 +1499,12 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu if fakeClient == nil { fakeClient = fakeclient.NewSimpleClientset() } + fakeClient.Tracker().Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fakeNode", + }, + Spec: v1.NodeSpec{}, + }) fakeWatcher := watch.NewRaceFreeFake() fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) @@ -1504,12 +1517,11 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu host := volumetest.NewFakeVolumeHostWithCSINodeName( tmpDir, fakeClient, - nil, - "node", + ProbeVolumePlugins(), + "fakeNode", csiDriverLister, ) - plugMgr := &volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) + plugMgr := host.GetPluginMgr() plug, err := plugMgr.FindPluginByName(CSIPluginName) if err != nil { diff --git a/pkg/volume/csi/csi_block_test.go b/pkg/volume/csi/csi_block_test.go index 61e76293d35..bfe793d6ce4 100644 --- a/pkg/volume/csi/csi_block_test.go +++ b/pkg/volume/csi/csi_block_test.go @@ -30,7 +30,6 @@ import ( featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" - volumetest "k8s.io/kubernetes/pkg/volume/testing" ) func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string, t *testing.T) (*csiBlockMapper, *volume.Spec, *api.PersistentVolume, error) { @@ -244,15 +243,6 @@ func TestBlockMapperSetupDevice(t *testing.T) { plug, tmpDir := newTestPlugin(t, nil) defer os.RemoveAll(tmpDir) - fakeClient := fakeclient.NewSimpleClientset() - host := volumetest.NewFakeVolumeHostWithCSINodeName( - tmpDir, - fakeClient, - nil, - "fakeNode", - nil, - ) - plug.host = host csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t) if err != nil { @@ -295,15 +285,6 @@ func TestBlockMapperMapPodDevice(t *testing.T) { plug, tmpDir := newTestPlugin(t, nil) defer os.RemoveAll(tmpDir) - fakeClient := fakeclient.NewSimpleClientset() - host := volumetest.NewFakeVolumeHostWithCSINodeName( - tmpDir, - fakeClient, - nil, - "fakeNode", - nil, - ) - plug.host = host csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t) if err != nil { @@ -371,14 +352,6 @@ func TestBlockMapperMapPodDeviceNotSupportAttach(t *testing.T) { plug, tmpDir := newTestPlugin(t, fakeClient) defer os.RemoveAll(tmpDir) - host := volumetest.NewFakeVolumeHostWithCSINodeName( - tmpDir, - fakeClient, - nil, - "fakeNode", - plug.csiDriverLister, - ) - plug.host = host csiMapper, _, _, err := prepareBlockMapperTest(plug, "test-pv", t) if err != nil { t.Fatalf("Failed to make a new Mapper: %v", err) @@ -401,15 +374,6 @@ func TestBlockMapperTearDownDevice(t *testing.T) { plug, tmpDir := newTestPlugin(t, nil) defer os.RemoveAll(tmpDir) - fakeClient := fakeclient.NewSimpleClientset() - host := volumetest.NewFakeVolumeHostWithCSINodeName( - tmpDir, - fakeClient, - nil, - "fakeNode", - nil, - ) - plug.host = host _, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t) if err != nil { diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go index ef62b7ca290..985e208faa0 100644 --- a/pkg/volume/csi/csi_mounter_test.go +++ b/pkg/volume/csi/csi_mounter_test.go @@ -34,7 +34,6 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" fakeclient "k8s.io/client-go/kubernetes/fake" featuregatetesting "k8s.io/component-base/featuregate/testing" - "k8s.io/klog" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" @@ -151,7 +150,6 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) { currentPodInfoMount := true for _, test := range tests { t.Run(test.name, func(t *testing.T) { - klog.Infof("Starting test %s", test.name) // Modes must be set if (and only if) CSIInlineVolume is enabled. var modes []storagev1beta1.VolumeLifecycleMode defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, test.csiInlineVolume)() diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index 883e96e1441..a4b929dc674 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -25,8 +25,10 @@ import ( "testing" api "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storagev1beta1 "k8s.io/api/storage/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -50,6 +52,13 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri client = fakeclient.NewSimpleClientset() } + client.Tracker().Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fakeNode", + }, + Spec: v1.NodeSpec{}, + }) + // Start informer for CSIDrivers. factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod) csiDriverInformer := factory.Storage().V1beta1().CSIDrivers() @@ -59,14 +68,13 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri host := volumetest.NewFakeVolumeHostWithCSINodeName( tmpDir, client, - nil, + ProbeVolumePlugins(), "fakeNode", csiDriverLister, ) - plugMgr := &volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) - plug, err := plugMgr.FindPluginByName(CSIPluginName) + pluginMgr := host.GetPluginMgr() + plug, err := pluginMgr.FindPluginByName(CSIPluginName) if err != nil { t.Fatalf("can't find plugin %v", CSIPluginName) } @@ -998,18 +1006,25 @@ func TestPluginFindAttachablePlugin(t *testing.T) { } defer os.RemoveAll(tmpDir) - client := fakeclient.NewSimpleClientset(getTestCSIDriver(test.driverName, nil, &test.canAttach, nil)) + client := fakeclient.NewSimpleClientset( + getTestCSIDriver(test.driverName, nil, &test.canAttach, nil), + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fakeNode", + }, + Spec: v1.NodeSpec{}, + }, + ) factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod) host := volumetest.NewFakeVolumeHostWithCSINodeName( tmpDir, client, - nil, + ProbeVolumePlugins(), "fakeNode", factory.Storage().V1beta1().CSIDrivers().Lister(), ) - plugMgr := &volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) + plugMgr := host.GetPluginMgr() plugin, err := plugMgr.FindAttachablePluginBySpec(test.spec) if err != nil && !test.shouldFail { @@ -1118,10 +1133,16 @@ func TestPluginFindDeviceMountablePluginBySpec(t *testing.T) { } defer os.RemoveAll(tmpDir) - client := fakeclient.NewSimpleClientset() - host := volumetest.NewFakeVolumeHost(tmpDir, client, nil) - plugMgr := &volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) + client := fakeclient.NewSimpleClientset( + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fakeNode", + }, + Spec: v1.NodeSpec{}, + }, + ) + host := volumetest.NewFakeVolumeHostWithCSINodeName(tmpDir, client, ProbeVolumePlugins(), "fakeNode", nil) + plugMgr := host.GetPluginMgr() plug, err := plugMgr.FindDeviceMountablePluginBySpec(test.spec) if err != nil && !test.shouldFail { diff --git a/pkg/volume/csi/csi_test.go b/pkg/volume/csi/csi_test.go index 2e198f257ab..b429a0fa439 100644 --- a/pkg/volume/csi/csi_test.go +++ b/pkg/volume/csi/csi_test.go @@ -26,6 +26,7 @@ import ( "time" api "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" storagebeta1 "k8s.io/api/storage/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -239,6 +240,12 @@ func TestCSI_VolumeAll(t *testing.T) { } objs = append(objs, driverInfo) } + objs = append(objs, &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fakeNode", + }, + Spec: v1.NodeSpec{}, + }) client := fakeclient.NewSimpleClientset(objs...) fakeWatcher := watch.NewRaceFreeFake() @@ -253,13 +260,11 @@ func TestCSI_VolumeAll(t *testing.T) { host := volumetest.NewFakeVolumeHostWithCSINodeName( tmpDir, client, - nil, - "csi-node", + ProbeVolumePlugins(), + "fakeNode", csiDriverInformer.Lister(), ) - - plugMgr := &volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) + plugMgr := host.GetPluginMgr() csiClient := setupClient(t, true) volSpec := test.specFunc(test.specName, test.driver, test.volName) diff --git a/pkg/volume/csi/testing/BUILD b/pkg/volume/csi/testing/BUILD index 78cddd23181..6043ba583c8 100644 --- a/pkg/volume/csi/testing/BUILD +++ b/pkg/volume/csi/testing/BUILD @@ -10,6 +10,8 @@ go_library( "//pkg/volume:go_default_library", "//pkg/volume/csi:go_default_library", "//pkg/volume/testing:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", diff --git a/pkg/volume/csi/testing/testing.go b/pkg/volume/csi/testing/testing.go index f4d4d9a4cba..1b73f693731 100644 --- a/pkg/volume/csi/testing/testing.go +++ b/pkg/volume/csi/testing/testing.go @@ -17,6 +17,10 @@ limitations under the License. package testing import ( + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" @@ -26,7 +30,6 @@ import ( "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi" volumetest "k8s.io/kubernetes/pkg/volume/testing" - "testing" ) // NewTestPlugin creates a plugin mgr to load plugins and setup a fake client @@ -40,6 +43,13 @@ func NewTestPlugin(t *testing.T, client *fakeclient.Clientset) (*volume.VolumePl client = fakeclient.NewSimpleClientset() } + client.Tracker().Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fakeNode", + }, + Spec: v1.NodeSpec{}, + }) + // Start informer for CSIDrivers. factory := informers.NewSharedInformerFactory(client, csi.CsiResyncPeriod) csiDriverInformer := factory.Storage().V1beta1().CSIDrivers() @@ -49,12 +59,11 @@ func NewTestPlugin(t *testing.T, client *fakeclient.Clientset) (*volume.VolumePl host := volumetest.NewFakeVolumeHostWithCSINodeName( tmpDir, client, - nil, + csi.ProbeVolumePlugins(), "fakeNode", csiDriverLister, ) - plugMgr := &volume.VolumePluginMgr{} - plugMgr.InitPlugins(csi.ProbeVolumePlugins(), nil /* prober */, host) + plugMgr := host.GetPluginMgr() plug, err := plugMgr.FindPluginByName(csi.CSIPluginName) if err != nil { diff --git a/pkg/volume/scaleio/sio_volume_test.go b/pkg/volume/scaleio/sio_volume_test.go index 196eb1aee44..d35e8b6d86a 100644 --- a/pkg/volume/scaleio/sio_volume_test.go +++ b/pkg/volume/scaleio/sio_volume_test.go @@ -55,11 +55,10 @@ func newPluginMgr(t *testing.T, apiObject runtime.Object) (*volume.VolumePluginM host := volumetest.NewFakeVolumeHostWithNodeLabels( tmpDir, fakeClient, - nil, + ProbeVolumePlugins(), map[string]string{sdcGUIDLabelName: "abc-123"}, ) - plugMgr := &volume.VolumePluginMgr{} - plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) + plugMgr := host.GetPluginMgr() return plugMgr, tmpDir } diff --git a/pkg/volume/testing/BUILD b/pkg/volume/testing/BUILD index d4a7b3cbbcf..8d9502fd331 100644 --- a/pkg/volume/testing/BUILD +++ b/pkg/volume/testing/BUILD @@ -25,6 +25,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 7e9cce332a4..5dc99c0a0c4 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -37,6 +37,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" storagelistersv1 "k8s.io/client-go/listers/storage/v1" @@ -71,7 +72,7 @@ const ( type fakeVolumeHost struct { rootDir string kubeClient clientset.Interface - pluginMgr VolumePluginMgr + pluginMgr *VolumePluginMgr cloud cloudprovider.Interface mounter mount.Interface hostUtil hostutil.HostUtils @@ -81,47 +82,48 @@ type fakeVolumeHost struct { subpather subpath.Interface csiDriverLister storagelisters.CSIDriverLister informerFactory informers.SharedInformerFactory + kubeletErr error + mux sync.Mutex } var _ VolumeHost = &fakeVolumeHost{} var _ AttachDetachVolumeHost = &fakeVolumeHost{} func NewFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost { - return newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil) + return newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil, "", nil) } func NewFakeVolumeHostWithCloudProvider(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeVolumeHost { - return newFakeVolumeHost(rootDir, kubeClient, plugins, cloud, nil) + return newFakeVolumeHost(rootDir, kubeClient, plugins, cloud, nil, "", nil) } func NewFakeVolumeHostWithNodeLabels(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) *fakeVolumeHost { - volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil) + volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil, "", nil) volHost.nodeLabels = labels return volHost } func NewFakeVolumeHostWithCSINodeName(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelisters.CSIDriverLister) *fakeVolumeHost { - volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil) - volHost.nodeName = nodeName - if driverLister != nil { - volHost.csiDriverLister = driverLister - } + volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister) return volHost } -func newFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType) *fakeVolumeHost { - host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud} +func newFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelisters.CSIDriverLister) *fakeVolumeHost { + host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud, nodeName: nodeName, csiDriverLister: driverLister} host.mounter = mount.NewFakeMounter(nil) host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap) host.exec = &testingexec.FakeExec{DisableScripts: true} + host.pluginMgr = &VolumePluginMgr{} host.pluginMgr.InitPlugins(plugins, nil /* prober */, host) host.subpather = &subpath.FakeSubpath{} host.informerFactory = informers.NewSharedInformerFactory(kubeClient, time.Minute) + // Wait until the InitPlugins setup is finished before returning from this setup func + host.WaitForKubeletErrNil() return host } func NewFakeVolumeHostWithMounterFSType(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) *fakeVolumeHost { - volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, pathToTypeMap) + volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil) return volHost } @@ -169,6 +171,10 @@ func (f *fakeVolumeHost) GetSubpather() subpath.Interface { return f.subpather } +func (f *fakeVolumeHost) GetPluginMgr() *VolumePluginMgr { + return f.pluginMgr +} + func (f *fakeVolumeHost) NewWrapperMounter(volName string, spec Spec, pod *v1.Pod, opts VolumeOptions) (Mounter, error) { // The name of wrapper volume is set to "wrapped_{wrapped_volume_name}" wrapperVolumeName := "wrapped_" + volName @@ -1519,17 +1525,13 @@ func VerifyGetMapPodDeviceCallCount( // manager and fake volume plugin using a fake volume host. func GetTestVolumePluginMgr( t *testing.T) (*VolumePluginMgr, *FakeVolumePlugin) { - v := NewFakeVolumeHost( - "", /* rootDir */ - nil, /* kubeClient */ - nil, /* plugins */ - ) plugins := ProbeVolumePlugins(VolumeConfig{}) - if err := v.pluginMgr.InitPlugins(plugins, nil /* prober */, v); err != nil { - t.Fatal(err) - } - - return &v.pluginMgr, plugins[0].(*FakeVolumePlugin) + v := NewFakeVolumeHost( + "", /* rootDir */ + nil, /* kubeClient */ + plugins, /* plugins */ + ) + return v.pluginMgr, plugins[0].(*FakeVolumePlugin) } // CreateTestPVC returns a provisionable PVC for tests @@ -1593,9 +1595,20 @@ func (f *fakeVolumeHost) IsAttachDetachController() bool { } func (f *fakeVolumeHost) SetKubeletError(err error) { + f.mux.Lock() + defer f.mux.Unlock() + f.kubeletErr = err return } func (f *fakeVolumeHost) WaitForCacheSync() error { return nil } + +func (f *fakeVolumeHost) WaitForKubeletErrNil() error { + return wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) { + f.mux.Lock() + defer f.mux.Unlock() + return f.kubeletErr == nil, nil + }) +} diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index b14a8ecabf1..afdc717f210 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -168,8 +168,9 @@ func TestPodDeletionWithDswp(t *testing.T) { t.Fatalf("Failed to created node : %v", err) } - go informers.Core().V1().Nodes().Informer().Run(podStopCh) + stopCh := make(chan struct{}) + go informers.Core().V1().Nodes().Informer().Run(stopCh) if _, err := testClient.CoreV1().Pods(ns.Name).Create(pod); err != nil { t.Errorf("Failed to create pod : %v", err) } @@ -178,11 +179,11 @@ func TestPodDeletionWithDswp(t *testing.T) { go podInformer.Run(podStopCh) // start controller loop - stopCh := make(chan struct{}) go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) initCSIObjects(stopCh, informers) go ctrl.Run(stopCh) + defer close(stopCh) waitToObservePods(t, podInformer, 1) podKey, err := cache.MetaNamespaceKeyFunc(pod) @@ -207,13 +208,12 @@ func TestPodDeletionWithDswp(t *testing.T) { waitToObservePods(t, podInformer, 0) // the populator loop turns every 1 minute waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 80*time.Second, "expected 0 pods in dsw after pod delete", 0) - close(stopCh) } func initCSIObjects(stopCh chan struct{}, informers clientgoinformers.SharedInformerFactory) { if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) { - go informers.Storage().V1beta1().CSINodes().Informer().Run(stopCh) + go informers.Storage().V1().CSINodes().Informer().Run(stopCh) } if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { go informers.Storage().V1beta1().CSIDrivers().Informer().Run(stopCh)