De-race some CSI unit tests that were initializing the plugin manager…ger (and plugins) twice. Set some const variables earlier to support node info manager initialization and wait for initialization to complete before finishing plugin setup.

This commit is contained in:
David Zhu 2019-11-14 13:02:37 -08:00
parent e64a4bc631
commit 1a47bf54e2
11 changed files with 133 additions and 109 deletions

View File

@ -26,9 +26,11 @@ import (
"testing" "testing"
"time" "time"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1" storage "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors" apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -371,7 +373,7 @@ func TestAttacherWithCSIDriver(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go func(volSpec *volume.Spec, expectAttach bool) { go func(volSpec *volume.Spec, expectAttach bool) {
attachID, err := csiAttacher.Attach(volSpec, types.NodeName("node")) attachID, err := csiAttacher.Attach(volSpec, types.NodeName("fakeNode"))
defer wg.Done() defer wg.Done()
if err != nil { if err != nil {
@ -383,7 +385,7 @@ func TestAttacherWithCSIDriver(t *testing.T) {
}(spec, test.expectVolumeAttachment) }(spec, test.expectVolumeAttachment)
if test.expectVolumeAttachment { if test.expectVolumeAttachment {
expectedAttachID := getAttachmentName("test-vol", test.driver, "node") expectedAttachID := getAttachmentName("test-vol", test.driver, "fakeNode")
status := storage.VolumeAttachmentStatus{ status := storage.VolumeAttachmentStatus{
Attached: true, Attached: true,
} }
@ -433,6 +435,12 @@ func TestAttacherWaitForVolumeAttachmentWithCSIDriver(t *testing.T) {
getTestCSIDriver("not-attachable", nil, &bFalse, nil), getTestCSIDriver("not-attachable", nil, &bFalse, nil),
getTestCSIDriver("attachable", nil, &bTrue, nil), getTestCSIDriver("attachable", nil, &bTrue, nil),
getTestCSIDriver("nil", nil, nil, nil), getTestCSIDriver("nil", nil, nil, nil),
&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "fakeNode",
},
Spec: v1.NodeSpec{},
},
) )
plug, tmpDir := newTestPlugin(t, fakeClient) plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
@ -478,21 +486,21 @@ func TestAttacherWaitForAttach(t *testing.T) {
driver: "attachable", driver: "attachable",
makeAttachment: func() *storage.VolumeAttachment { makeAttachment: func() *storage.VolumeAttachment {
testAttachID := getAttachmentName("test-vol", "attachable", "node") testAttachID := getAttachmentName("test-vol", "attachable", "fakeNode")
successfulAttachment := makeTestAttachment(testAttachID, "node", "test-pv") successfulAttachment := makeTestAttachment(testAttachID, "fakeNode", "test-pv")
successfulAttachment.Status.Attached = true successfulAttachment.Status.Attached = true
return successfulAttachment return successfulAttachment
}, },
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "attachable", "test-vol"), false), spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "attachable", "test-vol"), false),
expectedAttachID: getAttachmentName("test-vol", "attachable", "node"), expectedAttachID: getAttachmentName("test-vol", "attachable", "fakeNode"),
expectError: false, expectError: false,
}, },
{ {
name: "failed attach with vol source", name: "failed attach with vol source",
makeAttachment: func() *storage.VolumeAttachment { makeAttachment: func() *storage.VolumeAttachment {
testAttachID := getAttachmentName("test-vol", "attachable", "node") testAttachID := getAttachmentName("test-vol", "attachable", "fakeNode")
successfulAttachment := makeTestAttachment(testAttachID, "node", "volSrc01") successfulAttachment := makeTestAttachment(testAttachID, "fakeNode", "volSrc01")
successfulAttachment.Status.Attached = true successfulAttachment.Status.Attached = true
return successfulAttachment return successfulAttachment
}, },
@ -559,21 +567,21 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) {
name: "successful attach with PV", name: "successful attach with PV",
makeAttachment: func() *storage.VolumeAttachment { makeAttachment: func() *storage.VolumeAttachment {
testAttachID := getAttachmentName("test-vol", "attachable", "node") testAttachID := getAttachmentName("test-vol", "attachable", "fakeNode")
successfulAttachment := makeTestAttachment(testAttachID, "node", "test-pv") successfulAttachment := makeTestAttachment(testAttachID, "fakeNode", "test-pv")
successfulAttachment.Status.Attached = true successfulAttachment.Status.Attached = true
return successfulAttachment return successfulAttachment
}, },
spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "attachable", "test-vol"), false), spec: volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "attachable", "test-vol"), false),
expectedAttachID: getAttachmentName("test-vol", "attachable", "node"), expectedAttachID: getAttachmentName("test-vol", "attachable", "fakeNode"),
expectError: false, expectError: false,
}, },
{ {
name: "failed attach with volSrc", name: "failed attach with volSrc",
makeAttachment: func() *storage.VolumeAttachment { makeAttachment: func() *storage.VolumeAttachment {
testAttachID := getAttachmentName("test-vol", "attachable", "node") testAttachID := getAttachmentName("test-vol", "attachable", "fakeNode")
successfulAttachment := makeTestAttachment(testAttachID, "node", "volSrc01") successfulAttachment := makeTestAttachment(testAttachID, "fakeNode", "volSrc01")
successfulAttachment.Status.Attached = true successfulAttachment.Status.Attached = true
return successfulAttachment return successfulAttachment
}, },
@ -625,7 +633,7 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) {
} }
func TestAttacherWaitForVolumeAttachment(t *testing.T) { func TestAttacherWaitForVolumeAttachment(t *testing.T) {
nodeName := "test-node" nodeName := "fakeNode"
testCases := []struct { testCases := []struct {
name string name string
initAttached bool initAttached bool
@ -781,7 +789,7 @@ func TestAttacherVolumesAreAttached(t *testing.T) {
t.Fatalf("failed to create new attacher: %v", err) t.Fatalf("failed to create new attacher: %v", err)
} }
csiAttacher := attacher.(*csiAttacher) csiAttacher := attacher.(*csiAttacher)
nodeName := "test-node" nodeName := "fakeNode"
var specs []*volume.Spec var specs []*volume.Spec
// create and save volume attchments // create and save volume attchments
@ -852,7 +860,7 @@ func TestAttacherVolumesAreAttachedWithInline(t *testing.T) {
t.Fatalf("failed to create new attacher: %v", err) t.Fatalf("failed to create new attacher: %v", err)
} }
csiAttacher := attacher.(*csiAttacher) csiAttacher := attacher.(*csiAttacher)
nodeName := "test-node" nodeName := "fakeNode"
var specs []*volume.Spec var specs []*volume.Spec
// create and save volume attchments // create and save volume attchments
@ -891,8 +899,7 @@ func TestAttacherVolumesAreAttachedWithInline(t *testing.T) {
} }
func TestAttacherDetach(t *testing.T) { func TestAttacherDetach(t *testing.T) {
nodeName := "fakeNode"
nodeName := "test-node"
testCases := []struct { testCases := []struct {
name string name string
volID string volID string
@ -1492,6 +1499,12 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
if fakeClient == nil { if fakeClient == nil {
fakeClient = fakeclient.NewSimpleClientset() fakeClient = fakeclient.NewSimpleClientset()
} }
fakeClient.Tracker().Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "fakeNode",
},
Spec: v1.NodeSpec{},
})
fakeWatcher := watch.NewRaceFreeFake() fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
@ -1504,12 +1517,11 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
host := volumetest.NewFakeVolumeHostWithCSINodeName( host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir, tmpDir,
fakeClient, fakeClient,
nil, ProbeVolumePlugins(),
"node", "fakeNode",
csiDriverLister, csiDriverLister,
) )
plugMgr := &volume.VolumePluginMgr{} plugMgr := host.GetPluginMgr()
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
plug, err := plugMgr.FindPluginByName(CSIPluginName) plug, err := plugMgr.FindPluginByName(CSIPluginName)
if err != nil { if err != nil {

View File

@ -30,7 +30,6 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
) )
func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string, t *testing.T) (*csiBlockMapper, *volume.Spec, *api.PersistentVolume, error) { func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string, t *testing.T) (*csiBlockMapper, *volume.Spec, *api.PersistentVolume, error) {
@ -244,15 +243,6 @@ func TestBlockMapperSetupDevice(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil) plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
"fakeNode",
nil,
)
plug.host = host
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t) csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
if err != nil { if err != nil {
@ -295,15 +285,6 @@ func TestBlockMapperMapPodDevice(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil) plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
"fakeNode",
nil,
)
plug.host = host
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t) csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
if err != nil { if err != nil {
@ -371,14 +352,6 @@ func TestBlockMapperMapPodDeviceNotSupportAttach(t *testing.T) {
plug, tmpDir := newTestPlugin(t, fakeClient) plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
"fakeNode",
plug.csiDriverLister,
)
plug.host = host
csiMapper, _, _, err := prepareBlockMapperTest(plug, "test-pv", t) csiMapper, _, _, err := prepareBlockMapperTest(plug, "test-pv", t)
if err != nil { if err != nil {
t.Fatalf("Failed to make a new Mapper: %v", err) t.Fatalf("Failed to make a new Mapper: %v", err)
@ -401,15 +374,6 @@ func TestBlockMapperTearDownDevice(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil) plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
fakeClient := fakeclient.NewSimpleClientset()
host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
"fakeNode",
nil,
)
plug.host = host
_, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t) _, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
if err != nil { if err != nil {

View File

@ -34,7 +34,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
fakeclient "k8s.io/client-go/kubernetes/fake" fakeclient "k8s.io/client-go/kubernetes/fake"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util"
@ -151,7 +150,6 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
currentPodInfoMount := true currentPodInfoMount := true
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
klog.Infof("Starting test %s", test.name)
// Modes must be set if (and only if) CSIInlineVolume is enabled. // Modes must be set if (and only if) CSIInlineVolume is enabled.
var modes []storagev1beta1.VolumeLifecycleMode var modes []storagev1beta1.VolumeLifecycleMode
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, test.csiInlineVolume)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, test.csiInlineVolume)()

View File

@ -25,8 +25,10 @@ import (
"testing" "testing"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1" storagev1beta1 "k8s.io/api/storage/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -50,6 +52,13 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
client = fakeclient.NewSimpleClientset() client = fakeclient.NewSimpleClientset()
} }
client.Tracker().Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "fakeNode",
},
Spec: v1.NodeSpec{},
})
// Start informer for CSIDrivers. // Start informer for CSIDrivers.
factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod) factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod)
csiDriverInformer := factory.Storage().V1beta1().CSIDrivers() csiDriverInformer := factory.Storage().V1beta1().CSIDrivers()
@ -59,14 +68,13 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
host := volumetest.NewFakeVolumeHostWithCSINodeName( host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir, tmpDir,
client, client,
nil, ProbeVolumePlugins(),
"fakeNode", "fakeNode",
csiDriverLister, csiDriverLister,
) )
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
plug, err := plugMgr.FindPluginByName(CSIPluginName) pluginMgr := host.GetPluginMgr()
plug, err := pluginMgr.FindPluginByName(CSIPluginName)
if err != nil { if err != nil {
t.Fatalf("can't find plugin %v", CSIPluginName) t.Fatalf("can't find plugin %v", CSIPluginName)
} }
@ -998,18 +1006,25 @@ func TestPluginFindAttachablePlugin(t *testing.T) {
} }
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
client := fakeclient.NewSimpleClientset(getTestCSIDriver(test.driverName, nil, &test.canAttach, nil)) client := fakeclient.NewSimpleClientset(
getTestCSIDriver(test.driverName, nil, &test.canAttach, nil),
&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "fakeNode",
},
Spec: v1.NodeSpec{},
},
)
factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod) factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod)
host := volumetest.NewFakeVolumeHostWithCSINodeName( host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir, tmpDir,
client, client,
nil, ProbeVolumePlugins(),
"fakeNode", "fakeNode",
factory.Storage().V1beta1().CSIDrivers().Lister(), factory.Storage().V1beta1().CSIDrivers().Lister(),
) )
plugMgr := &volume.VolumePluginMgr{} plugMgr := host.GetPluginMgr()
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
plugin, err := plugMgr.FindAttachablePluginBySpec(test.spec) plugin, err := plugMgr.FindAttachablePluginBySpec(test.spec)
if err != nil && !test.shouldFail { if err != nil && !test.shouldFail {
@ -1118,10 +1133,16 @@ func TestPluginFindDeviceMountablePluginBySpec(t *testing.T) {
} }
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
client := fakeclient.NewSimpleClientset() client := fakeclient.NewSimpleClientset(
host := volumetest.NewFakeVolumeHost(tmpDir, client, nil) &v1.Node{
plugMgr := &volume.VolumePluginMgr{} ObjectMeta: metav1.ObjectMeta{
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host) Name: "fakeNode",
},
Spec: v1.NodeSpec{},
},
)
host := volumetest.NewFakeVolumeHostWithCSINodeName(tmpDir, client, ProbeVolumePlugins(), "fakeNode", nil)
plugMgr := host.GetPluginMgr()
plug, err := plugMgr.FindDeviceMountablePluginBySpec(test.spec) plug, err := plugMgr.FindDeviceMountablePluginBySpec(test.spec)
if err != nil && !test.shouldFail { if err != nil && !test.shouldFail {

View File

@ -26,6 +26,7 @@ import (
"time" "time"
api "k8s.io/api/core/v1" api "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1" storage "k8s.io/api/storage/v1"
storagebeta1 "k8s.io/api/storage/v1beta1" storagebeta1 "k8s.io/api/storage/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -239,6 +240,12 @@ func TestCSI_VolumeAll(t *testing.T) {
} }
objs = append(objs, driverInfo) objs = append(objs, driverInfo)
} }
objs = append(objs, &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "fakeNode",
},
Spec: v1.NodeSpec{},
})
client := fakeclient.NewSimpleClientset(objs...) client := fakeclient.NewSimpleClientset(objs...)
fakeWatcher := watch.NewRaceFreeFake() fakeWatcher := watch.NewRaceFreeFake()
@ -253,13 +260,11 @@ func TestCSI_VolumeAll(t *testing.T) {
host := volumetest.NewFakeVolumeHostWithCSINodeName( host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir, tmpDir,
client, client,
nil, ProbeVolumePlugins(),
"csi-node", "fakeNode",
csiDriverInformer.Lister(), csiDriverInformer.Lister(),
) )
plugMgr := host.GetPluginMgr()
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
csiClient := setupClient(t, true) csiClient := setupClient(t, true)
volSpec := test.specFunc(test.specName, test.driver, test.volName) volSpec := test.specFunc(test.specName, test.driver, test.volName)

View File

@ -10,6 +10,8 @@ go_library(
"//pkg/volume:go_default_library", "//pkg/volume:go_default_library",
"//pkg/volume/csi:go_default_library", "//pkg/volume/csi:go_default_library",
"//pkg/volume/testing:go_default_library", "//pkg/volume/testing:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",

View File

@ -17,6 +17,10 @@ limitations under the License.
package testing package testing
import ( import (
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
@ -26,7 +30,6 @@ import (
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi" "k8s.io/kubernetes/pkg/volume/csi"
volumetest "k8s.io/kubernetes/pkg/volume/testing" volumetest "k8s.io/kubernetes/pkg/volume/testing"
"testing"
) )
// NewTestPlugin creates a plugin mgr to load plugins and setup a fake client // NewTestPlugin creates a plugin mgr to load plugins and setup a fake client
@ -40,6 +43,13 @@ func NewTestPlugin(t *testing.T, client *fakeclient.Clientset) (*volume.VolumePl
client = fakeclient.NewSimpleClientset() client = fakeclient.NewSimpleClientset()
} }
client.Tracker().Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "fakeNode",
},
Spec: v1.NodeSpec{},
})
// Start informer for CSIDrivers. // Start informer for CSIDrivers.
factory := informers.NewSharedInformerFactory(client, csi.CsiResyncPeriod) factory := informers.NewSharedInformerFactory(client, csi.CsiResyncPeriod)
csiDriverInformer := factory.Storage().V1beta1().CSIDrivers() csiDriverInformer := factory.Storage().V1beta1().CSIDrivers()
@ -49,12 +59,11 @@ func NewTestPlugin(t *testing.T, client *fakeclient.Clientset) (*volume.VolumePl
host := volumetest.NewFakeVolumeHostWithCSINodeName( host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir, tmpDir,
client, client,
nil, csi.ProbeVolumePlugins(),
"fakeNode", "fakeNode",
csiDriverLister, csiDriverLister,
) )
plugMgr := &volume.VolumePluginMgr{} plugMgr := host.GetPluginMgr()
plugMgr.InitPlugins(csi.ProbeVolumePlugins(), nil /* prober */, host)
plug, err := plugMgr.FindPluginByName(csi.CSIPluginName) plug, err := plugMgr.FindPluginByName(csi.CSIPluginName)
if err != nil { if err != nil {

View File

@ -55,11 +55,10 @@ func newPluginMgr(t *testing.T, apiObject runtime.Object) (*volume.VolumePluginM
host := volumetest.NewFakeVolumeHostWithNodeLabels( host := volumetest.NewFakeVolumeHostWithNodeLabels(
tmpDir, tmpDir,
fakeClient, fakeClient,
nil, ProbeVolumePlugins(),
map[string]string{sdcGUIDLabelName: "abc-123"}, map[string]string{sdcGUIDLabelName: "abc-123"},
) )
plugMgr := &volume.VolumePluginMgr{} plugMgr := host.GetPluginMgr()
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
return plugMgr, tmpDir return plugMgr, tmpDir
} }

View File

@ -25,6 +25,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",

View File

@ -37,6 +37,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
storagelistersv1 "k8s.io/client-go/listers/storage/v1" storagelistersv1 "k8s.io/client-go/listers/storage/v1"
@ -71,7 +72,7 @@ const (
type fakeVolumeHost struct { type fakeVolumeHost struct {
rootDir string rootDir string
kubeClient clientset.Interface kubeClient clientset.Interface
pluginMgr VolumePluginMgr pluginMgr *VolumePluginMgr
cloud cloudprovider.Interface cloud cloudprovider.Interface
mounter mount.Interface mounter mount.Interface
hostUtil hostutil.HostUtils hostUtil hostutil.HostUtils
@ -81,47 +82,48 @@ type fakeVolumeHost struct {
subpather subpath.Interface subpather subpath.Interface
csiDriverLister storagelisters.CSIDriverLister csiDriverLister storagelisters.CSIDriverLister
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
kubeletErr error
mux sync.Mutex
} }
var _ VolumeHost = &fakeVolumeHost{} var _ VolumeHost = &fakeVolumeHost{}
var _ AttachDetachVolumeHost = &fakeVolumeHost{} var _ AttachDetachVolumeHost = &fakeVolumeHost{}
func NewFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost { func NewFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost {
return newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil) return newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil, "", nil)
} }
func NewFakeVolumeHostWithCloudProvider(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeVolumeHost { func NewFakeVolumeHostWithCloudProvider(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeVolumeHost {
return newFakeVolumeHost(rootDir, kubeClient, plugins, cloud, nil) return newFakeVolumeHost(rootDir, kubeClient, plugins, cloud, nil, "", nil)
} }
func NewFakeVolumeHostWithNodeLabels(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) *fakeVolumeHost { func NewFakeVolumeHostWithNodeLabels(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) *fakeVolumeHost {
volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil) volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil, "", nil)
volHost.nodeLabels = labels volHost.nodeLabels = labels
return volHost return volHost
} }
func NewFakeVolumeHostWithCSINodeName(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelisters.CSIDriverLister) *fakeVolumeHost { func NewFakeVolumeHostWithCSINodeName(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelisters.CSIDriverLister) *fakeVolumeHost {
volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil) volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister)
volHost.nodeName = nodeName
if driverLister != nil {
volHost.csiDriverLister = driverLister
}
return volHost return volHost
} }
func newFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType) *fakeVolumeHost { func newFakeVolumeHost(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelisters.CSIDriverLister) *fakeVolumeHost {
host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud} host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud, nodeName: nodeName, csiDriverLister: driverLister}
host.mounter = mount.NewFakeMounter(nil) host.mounter = mount.NewFakeMounter(nil)
host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap) host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap)
host.exec = &testingexec.FakeExec{DisableScripts: true} host.exec = &testingexec.FakeExec{DisableScripts: true}
host.pluginMgr = &VolumePluginMgr{}
host.pluginMgr.InitPlugins(plugins, nil /* prober */, host) host.pluginMgr.InitPlugins(plugins, nil /* prober */, host)
host.subpather = &subpath.FakeSubpath{} host.subpather = &subpath.FakeSubpath{}
host.informerFactory = informers.NewSharedInformerFactory(kubeClient, time.Minute) host.informerFactory = informers.NewSharedInformerFactory(kubeClient, time.Minute)
// Wait until the InitPlugins setup is finished before returning from this setup func
host.WaitForKubeletErrNil()
return host return host
} }
func NewFakeVolumeHostWithMounterFSType(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) *fakeVolumeHost { func NewFakeVolumeHostWithMounterFSType(rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) *fakeVolumeHost {
volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, pathToTypeMap) volHost := newFakeVolumeHost(rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil)
return volHost return volHost
} }
@ -169,6 +171,10 @@ func (f *fakeVolumeHost) GetSubpather() subpath.Interface {
return f.subpather return f.subpather
} }
func (f *fakeVolumeHost) GetPluginMgr() *VolumePluginMgr {
return f.pluginMgr
}
func (f *fakeVolumeHost) NewWrapperMounter(volName string, spec Spec, pod *v1.Pod, opts VolumeOptions) (Mounter, error) { func (f *fakeVolumeHost) NewWrapperMounter(volName string, spec Spec, pod *v1.Pod, opts VolumeOptions) (Mounter, error) {
// The name of wrapper volume is set to "wrapped_{wrapped_volume_name}" // The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
wrapperVolumeName := "wrapped_" + volName wrapperVolumeName := "wrapped_" + volName
@ -1519,17 +1525,13 @@ func VerifyGetMapPodDeviceCallCount(
// manager and fake volume plugin using a fake volume host. // manager and fake volume plugin using a fake volume host.
func GetTestVolumePluginMgr( func GetTestVolumePluginMgr(
t *testing.T) (*VolumePluginMgr, *FakeVolumePlugin) { t *testing.T) (*VolumePluginMgr, *FakeVolumePlugin) {
v := NewFakeVolumeHost(
"", /* rootDir */
nil, /* kubeClient */
nil, /* plugins */
)
plugins := ProbeVolumePlugins(VolumeConfig{}) plugins := ProbeVolumePlugins(VolumeConfig{})
if err := v.pluginMgr.InitPlugins(plugins, nil /* prober */, v); err != nil { v := NewFakeVolumeHost(
t.Fatal(err) "", /* rootDir */
} nil, /* kubeClient */
plugins, /* plugins */
return &v.pluginMgr, plugins[0].(*FakeVolumePlugin) )
return v.pluginMgr, plugins[0].(*FakeVolumePlugin)
} }
// CreateTestPVC returns a provisionable PVC for tests // CreateTestPVC returns a provisionable PVC for tests
@ -1593,9 +1595,20 @@ func (f *fakeVolumeHost) IsAttachDetachController() bool {
} }
func (f *fakeVolumeHost) SetKubeletError(err error) { func (f *fakeVolumeHost) SetKubeletError(err error) {
f.mux.Lock()
defer f.mux.Unlock()
f.kubeletErr = err
return return
} }
func (f *fakeVolumeHost) WaitForCacheSync() error { func (f *fakeVolumeHost) WaitForCacheSync() error {
return nil return nil
} }
func (f *fakeVolumeHost) WaitForKubeletErrNil() error {
return wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
f.mux.Lock()
defer f.mux.Unlock()
return f.kubeletErr == nil, nil
})
}

View File

@ -168,8 +168,9 @@ func TestPodDeletionWithDswp(t *testing.T) {
t.Fatalf("Failed to created node : %v", err) t.Fatalf("Failed to created node : %v", err)
} }
go informers.Core().V1().Nodes().Informer().Run(podStopCh) stopCh := make(chan struct{})
go informers.Core().V1().Nodes().Informer().Run(stopCh)
if _, err := testClient.CoreV1().Pods(ns.Name).Create(pod); err != nil { if _, err := testClient.CoreV1().Pods(ns.Name).Create(pod); err != nil {
t.Errorf("Failed to create pod : %v", err) t.Errorf("Failed to create pod : %v", err)
} }
@ -178,11 +179,11 @@ func TestPodDeletionWithDswp(t *testing.T) {
go podInformer.Run(podStopCh) go podInformer.Run(podStopCh)
// start controller loop // start controller loop
stopCh := make(chan struct{})
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
initCSIObjects(stopCh, informers) initCSIObjects(stopCh, informers)
go ctrl.Run(stopCh) go ctrl.Run(stopCh)
defer close(stopCh)
waitToObservePods(t, podInformer, 1) waitToObservePods(t, podInformer, 1)
podKey, err := cache.MetaNamespaceKeyFunc(pod) podKey, err := cache.MetaNamespaceKeyFunc(pod)
@ -207,13 +208,12 @@ func TestPodDeletionWithDswp(t *testing.T) {
waitToObservePods(t, podInformer, 0) waitToObservePods(t, podInformer, 0)
// the populator loop turns every 1 minute // the populator loop turns every 1 minute
waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 80*time.Second, "expected 0 pods in dsw after pod delete", 0) waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 80*time.Second, "expected 0 pods in dsw after pod delete", 0)
close(stopCh)
} }
func initCSIObjects(stopCh chan struct{}, informers clientgoinformers.SharedInformerFactory) { func initCSIObjects(stopCh chan struct{}, informers clientgoinformers.SharedInformerFactory) {
if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) { utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
go informers.Storage().V1beta1().CSINodes().Informer().Run(stopCh) go informers.Storage().V1().CSINodes().Informer().Run(stopCh)
} }
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
go informers.Storage().V1beta1().CSIDrivers().Informer().Run(stopCh) go informers.Storage().V1beta1().CSIDrivers().Informer().Run(stopCh)