mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
CSI: Modify VolumeAttachment check to use Informer/Cache
Change-Id: Ie70c8b6657c67eefbf13042f36d56ca84a2e42bb
This commit is contained in:
parent
06e3aeccc4
commit
2a89577659
@ -332,6 +332,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
|
||||
ctx.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
csiNodeInformer,
|
||||
csiDriverInformer,
|
||||
ctx.InformerFactory.Storage().V1().VolumeAttachments(),
|
||||
ctx.Cloud,
|
||||
plugins,
|
||||
GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
|
||||
|
@ -111,6 +111,7 @@ func NewAttachDetachController(
|
||||
pvInformer coreinformers.PersistentVolumeInformer,
|
||||
csiNodeInformer storageinformersv1.CSINodeInformer,
|
||||
csiDriverInformer storageinformersv1.CSIDriverInformer,
|
||||
volumeAttachmentInformer storageinformersv1.VolumeAttachmentInformer,
|
||||
cloud cloudprovider.Interface,
|
||||
plugins []volume.VolumePlugin,
|
||||
prober volume.DynamicPluginProber,
|
||||
@ -142,6 +143,9 @@ func NewAttachDetachController(
|
||||
adc.csiDriverLister = csiDriverInformer.Lister()
|
||||
adc.csiDriversSynced = csiDriverInformer.Informer().HasSynced
|
||||
|
||||
adc.volumeAttachmentLister = volumeAttachmentInformer.Lister()
|
||||
adc.volumeAttachmentSynced = volumeAttachmentInformer.Informer().HasSynced
|
||||
|
||||
if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
|
||||
return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
|
||||
}
|
||||
@ -254,6 +258,12 @@ type attachDetachController struct {
|
||||
csiDriverLister storagelistersv1.CSIDriverLister
|
||||
csiDriversSynced kcache.InformerSynced
|
||||
|
||||
// volumeAttachmentLister is the shared volumeAttachment lister used to fetch and store
|
||||
// VolumeAttachment objects from the API server. It is shared with other controllers
|
||||
// and therefore the VolumeAttachment objects in its store should be treated as immutable.
|
||||
volumeAttachmentLister storagelistersv1.VolumeAttachmentLister
|
||||
volumeAttachmentSynced kcache.InformerSynced
|
||||
|
||||
// cloud provider used by volume host
|
||||
cloud cloudprovider.Interface
|
||||
|
||||
@ -319,6 +329,9 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
|
||||
if adc.csiDriversSynced != nil {
|
||||
synced = append(synced, adc.csiDriversSynced)
|
||||
}
|
||||
if adc.volumeAttachmentSynced != nil {
|
||||
synced = append(synced, adc.volumeAttachmentSynced)
|
||||
}
|
||||
|
||||
if !kcache.WaitForNamedCacheSync("attach detach", stopCh, synced...) {
|
||||
return
|
||||
@ -675,6 +688,10 @@ func (adc *attachDetachController) IsAttachDetachController() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) VolumeAttachmentLister() storagelistersv1.VolumeAttachmentLister {
|
||||
return adc.volumeAttachmentLister
|
||||
}
|
||||
|
||||
// VolumeHost implementation
|
||||
// This is an unfortunate requirement of the current factoring of volume plugin
|
||||
// initializing code. It requires kubelet specific methods used by the mounting
|
||||
|
@ -48,6 +48,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Storage().V1().CSINodes(),
|
||||
informerFactory.Storage().V1().CSIDrivers(),
|
||||
informerFactory.Storage().V1().VolumeAttachments(),
|
||||
nil, /* cloud */
|
||||
nil, /* plugins */
|
||||
nil, /* prober */
|
||||
@ -168,6 +169,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Storage().V1().CSINodes(),
|
||||
informerFactory.Storage().V1().CSIDrivers(),
|
||||
informerFactory.Storage().V1().VolumeAttachments(),
|
||||
nil, /* cloud */
|
||||
plugins,
|
||||
prober,
|
||||
|
@ -79,8 +79,10 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||
|
@ -197,8 +197,19 @@ func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.No
|
||||
}
|
||||
|
||||
attachID := getAttachmentName(volumeHandle, driverName, string(nodeName))
|
||||
var attach *storage.VolumeAttachment
|
||||
if c.plugin.volumeAttachmentLister != nil {
|
||||
attach, err = c.plugin.volumeAttachmentLister.Get(attachID)
|
||||
if err == nil {
|
||||
attached[spec] = attach.Status.Attached
|
||||
continue
|
||||
}
|
||||
klog.V(4).Info(log("attacher.VolumesAreAttached failed in AttachmentLister for attach.ID=%v: %v. Probing the API server.", attachID, err))
|
||||
}
|
||||
// The cache lookup is not setup or the object is not found in the cache.
|
||||
// Get the object from the API server.
|
||||
klog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
|
||||
attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
|
||||
attach, err = c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
|
||||
if err != nil {
|
||||
attached[spec] = false
|
||||
klog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err))
|
||||
|
@ -38,8 +38,10 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
storageinformer "k8s.io/client-go/informers/storage/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
fakeclient "k8s.io/client-go/kubernetes/fake"
|
||||
storagelister "k8s.io/client-go/listers/storage/v1"
|
||||
core "k8s.io/client-go/testing"
|
||||
utiltesting "k8s.io/client-go/util/testing"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
@ -106,9 +108,11 @@ func markVolumeAttached(t *testing.T, client clientset.Interface, watch *watch.R
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if watch != nil {
|
||||
watch.Modify(attach)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAttacherAttach(t *testing.T) {
|
||||
testCases := []struct {
|
||||
@ -197,7 +201,7 @@ func TestAttacherAttach(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Logf("test case: %s", tc.name)
|
||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
|
||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
attacher, err := plug.NewAttacher()
|
||||
@ -281,7 +285,7 @@ func TestAttacherAttachWithInline(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Logf("test case: %s", tc.name)
|
||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
|
||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
attacher, err := plug.NewAttacher()
|
||||
@ -349,7 +353,7 @@ func TestAttacherWithCSIDriver(t *testing.T) {
|
||||
getTestCSIDriver("attachable", nil, &bTrue, nil),
|
||||
getTestCSIDriver("nil", nil, nil, nil),
|
||||
)
|
||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, fakeClient)
|
||||
plug, _, tmpDir, _ := newTestWatchPlugin(t, fakeClient, true)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
attacher, err := plug.NewAttacher()
|
||||
@ -390,7 +394,7 @@ func TestAttacherWithCSIDriver(t *testing.T) {
|
||||
status := storage.VolumeAttachmentStatus{
|
||||
Attached: true,
|
||||
}
|
||||
markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, expectedAttachID, status)
|
||||
markVolumeAttached(t, csiAttacher.k8s, nil, expectedAttachID, status)
|
||||
}
|
||||
wg.Wait()
|
||||
})
|
||||
@ -515,7 +519,7 @@ func TestAttacherWaitForAttach(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
|
||||
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
attacher, err := plug.NewAttacher()
|
||||
@ -597,7 +601,7 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
|
||||
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
attacher, err := plug.NewAttacher()
|
||||
@ -684,7 +688,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) {
|
||||
|
||||
for i, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
|
||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
attacher, err := plug.NewAttacher()
|
||||
@ -941,7 +945,7 @@ func TestAttacherDetach(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Logf("running test: %v", tc.name)
|
||||
plug, fakeWatcher, tmpDir, client := newTestWatchPlugin(t, nil)
|
||||
plug, fakeWatcher, tmpDir, client := newTestWatchPlugin(t, nil, false)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
if tc.reactor != nil {
|
||||
client.PrependReactor("*", "*", tc.reactor)
|
||||
@ -998,7 +1002,7 @@ func TestAttacherDetach(t *testing.T) {
|
||||
func TestAttacherGetDeviceMountPath(t *testing.T) {
|
||||
// Setup
|
||||
// Create a new attacher
|
||||
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
|
||||
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
attacher, err0 := plug.NewAttacher()
|
||||
if err0 != nil {
|
||||
@ -1163,7 +1167,7 @@ func TestAttacherMountDevice(t *testing.T) {
|
||||
|
||||
// Setup
|
||||
// Create a new attacher
|
||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
|
||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
attacher, err0 := plug.NewAttacher()
|
||||
if err0 != nil {
|
||||
@ -1314,7 +1318,7 @@ func TestAttacherMountDeviceWithInline(t *testing.T) {
|
||||
|
||||
// Setup
|
||||
// Create a new attacher
|
||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
|
||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
attacher, err0 := plug.NewAttacher()
|
||||
if err0 != nil {
|
||||
@ -1442,7 +1446,7 @@ func TestAttacherUnmountDevice(t *testing.T) {
|
||||
t.Logf("Running test case: %s", tc.testName)
|
||||
// Setup
|
||||
// Create a new attacher
|
||||
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
|
||||
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
attacher, err0 := plug.NewAttacher()
|
||||
if err0 != nil {
|
||||
@ -1529,7 +1533,7 @@ func TestAttacherUnmountDevice(t *testing.T) {
|
||||
}
|
||||
|
||||
// create a plugin mgr to load plugins and setup a fake client
|
||||
func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) {
|
||||
func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset, setupInformer bool) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) {
|
||||
tmpDir, err := utiltesting.MkTmpdir("csi-test")
|
||||
if err != nil {
|
||||
t.Fatalf("can't create temp dir: %v", err)
|
||||
@ -1545,12 +1549,24 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
|
||||
Spec: v1.NodeSpec{},
|
||||
})
|
||||
fakeWatcher := watch.NewRaceFreeFake()
|
||||
if !setupInformer {
|
||||
// TODO: In the fakeClient, if default watchReactor is overwritten, the volumeAttachmentInformer
|
||||
// and the csiAttacher.Attach both endup reading from same channel causing hang in Attach().
|
||||
// So, until this is fixed, we don't overwrite default reactor while setting up volumeAttachment informer.
|
||||
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
|
||||
}
|
||||
|
||||
// Start informer for CSIDrivers.
|
||||
factory := informers.NewSharedInformerFactory(fakeClient, CsiResyncPeriod)
|
||||
csiDriverInformer := factory.Storage().V1().CSIDrivers()
|
||||
csiDriverLister := csiDriverInformer.Lister()
|
||||
var volumeAttachmentInformer storageinformer.VolumeAttachmentInformer
|
||||
var volumeAttachmentLister storagelister.VolumeAttachmentLister
|
||||
if setupInformer {
|
||||
volumeAttachmentInformer = factory.Storage().V1().VolumeAttachments()
|
||||
volumeAttachmentLister = volumeAttachmentInformer.Lister()
|
||||
}
|
||||
|
||||
factory.Start(wait.NeverStop)
|
||||
|
||||
host := volumetest.NewFakeVolumeHostWithCSINodeName(t,
|
||||
@ -1559,6 +1575,7 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
|
||||
ProbeVolumePlugins(),
|
||||
"fakeNode",
|
||||
csiDriverLister,
|
||||
volumeAttachmentLister,
|
||||
)
|
||||
plugMgr := host.GetPluginMgr()
|
||||
|
||||
@ -1577,5 +1594,10 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
|
||||
return csiDriverInformer.Informer().HasSynced(), nil
|
||||
})
|
||||
|
||||
if volumeAttachmentInformer != nil {
|
||||
wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) {
|
||||
return volumeAttachmentInformer.Informer().HasSynced(), nil
|
||||
})
|
||||
}
|
||||
return csiPlug, fakeWatcher, tmpDir, fakeClient
|
||||
}
|
||||
|
@ -62,6 +62,7 @@ type csiPlugin struct {
|
||||
host volume.VolumeHost
|
||||
blockEnabled bool
|
||||
csiDriverLister storagelisters.CSIDriverLister
|
||||
volumeAttachmentLister storagelisters.VolumeAttachmentLister
|
||||
}
|
||||
|
||||
// ProbeVolumePlugins returns implemented plugins
|
||||
@ -186,13 +187,17 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
|
||||
if csiClient == nil {
|
||||
klog.Warning(log("kubeclient not set, assuming standalone kubelet"))
|
||||
} else {
|
||||
// set CSIDriverLister
|
||||
// set CSIDriverLister and volumeAttachmentLister
|
||||
adcHost, ok := host.(volume.AttachDetachVolumeHost)
|
||||
if ok {
|
||||
p.csiDriverLister = adcHost.CSIDriverLister()
|
||||
if p.csiDriverLister == nil {
|
||||
klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost"))
|
||||
}
|
||||
p.volumeAttachmentLister = adcHost.VolumeAttachmentLister()
|
||||
if p.volumeAttachmentLister == nil {
|
||||
klog.Error(log("VolumeAttachmentLister not found on AttachDetachVolumeHost"))
|
||||
}
|
||||
}
|
||||
kletHost, ok := host.(volume.KubeletVolumeHost)
|
||||
if ok {
|
||||
@ -200,6 +205,8 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
|
||||
if p.csiDriverLister == nil {
|
||||
klog.Error(log("CSIDriverLister not found on KubeletVolumeHost"))
|
||||
}
|
||||
// We don't run the volumeAttachmentLister in the kubelet context
|
||||
p.volumeAttachmentLister = nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -62,6 +62,8 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
|
||||
factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod)
|
||||
csiDriverInformer := factory.Storage().V1().CSIDrivers()
|
||||
csiDriverLister := csiDriverInformer.Lister()
|
||||
volumeAttachmentInformer := factory.Storage().V1().VolumeAttachments()
|
||||
volumeAttachmentLister := volumeAttachmentInformer.Lister()
|
||||
go factory.Start(wait.NeverStop)
|
||||
|
||||
host := volumetest.NewFakeVolumeHostWithCSINodeName(t,
|
||||
@ -70,6 +72,7 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
|
||||
ProbeVolumePlugins(),
|
||||
"fakeNode",
|
||||
csiDriverLister,
|
||||
volumeAttachmentLister,
|
||||
)
|
||||
|
||||
pluginMgr := host.GetPluginMgr()
|
||||
@ -88,6 +91,9 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
|
||||
return csiDriverInformer.Informer().HasSynced(), nil
|
||||
})
|
||||
|
||||
wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) {
|
||||
return volumeAttachmentInformer.Informer().HasSynced(), nil
|
||||
})
|
||||
return csiPlug, tmpDir
|
||||
}
|
||||
|
||||
@ -1018,6 +1024,7 @@ func TestPluginFindAttachablePlugin(t *testing.T) {
|
||||
ProbeVolumePlugins(),
|
||||
"fakeNode",
|
||||
factory.Storage().V1().CSIDrivers().Lister(),
|
||||
factory.Storage().V1().VolumeAttachments().Lister(),
|
||||
)
|
||||
|
||||
plugMgr := host.GetPluginMgr()
|
||||
@ -1137,7 +1144,7 @@ func TestPluginFindDeviceMountablePluginBySpec(t *testing.T) {
|
||||
Spec: v1.NodeSpec{},
|
||||
},
|
||||
)
|
||||
host := volumetest.NewFakeVolumeHostWithCSINodeName(t, tmpDir, client, ProbeVolumePlugins(), "fakeNode", nil)
|
||||
host := volumetest.NewFakeVolumeHostWithCSINodeName(t, tmpDir, client, ProbeVolumePlugins(), "fakeNode", nil, nil)
|
||||
plugMgr := host.GetPluginMgr()
|
||||
plug, err := plugMgr.FindDeviceMountablePluginBySpec(test.spec)
|
||||
if err != nil && !test.shouldFail {
|
||||
|
@ -250,6 +250,7 @@ func TestCSI_VolumeAll(t *testing.T) {
|
||||
|
||||
factory := informers.NewSharedInformerFactory(client, time.Hour /* disable resync */)
|
||||
csiDriverInformer := factory.Storage().V1().CSIDrivers()
|
||||
volumeAttachmentInformer := factory.Storage().V1().VolumeAttachments()
|
||||
if driverInfo != nil {
|
||||
csiDriverInformer.Informer().GetStore().Add(driverInfo)
|
||||
}
|
||||
@ -261,6 +262,7 @@ func TestCSI_VolumeAll(t *testing.T) {
|
||||
ProbeVolumePlugins(),
|
||||
"fakeNode",
|
||||
csiDriverInformer.Lister(),
|
||||
volumeAttachmentInformer.Lister(),
|
||||
)
|
||||
plugMgr := host.GetPluginMgr()
|
||||
csiClient := setupClient(t, true)
|
||||
|
@ -969,6 +969,7 @@ func TestInstallCSIDriverExistingAnnotation(t *testing.T) {
|
||||
nil,
|
||||
nodeName,
|
||||
nil,
|
||||
nil,
|
||||
)
|
||||
|
||||
nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil)
|
||||
@ -1030,6 +1031,7 @@ func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []t
|
||||
nil,
|
||||
nodeName,
|
||||
nil,
|
||||
nil,
|
||||
)
|
||||
nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil)
|
||||
|
||||
|
@ -60,6 +60,7 @@ func NewTestPlugin(t *testing.T, client *fakeclient.Clientset) (*volume.VolumePl
|
||||
csi.ProbeVolumePlugins(),
|
||||
"fakeNode",
|
||||
csiDriverLister,
|
||||
nil,
|
||||
)
|
||||
plugMgr := host.GetPluginMgr()
|
||||
|
||||
|
@ -351,6 +351,8 @@ type AttachDetachVolumeHost interface {
|
||||
// CSIDriverLister returns the informer lister for the CSIDriver API Object
|
||||
CSIDriverLister() storagelistersv1.CSIDriverLister
|
||||
|
||||
// VolumeAttachmentLister returns the informer lister for the VolumeAttachment API Object
|
||||
VolumeAttachmentLister() storagelistersv1.VolumeAttachmentLister
|
||||
// IsAttachDetachController is an interface marker to strictly tie AttachDetachVolumeHost
|
||||
// to the attachDetachController
|
||||
IsAttachDetachController() bool
|
||||
|
@ -111,6 +111,7 @@ type fakeVolumeHost struct {
|
||||
nodeName string
|
||||
subpather subpath.Interface
|
||||
csiDriverLister storagelistersv1.CSIDriverLister
|
||||
volumeAttachmentLister storagelistersv1.VolumeAttachmentLister
|
||||
informerFactory informers.SharedInformerFactory
|
||||
kubeletErr error
|
||||
mux sync.Mutex
|
||||
@ -120,29 +121,29 @@ var _ VolumeHost = &fakeVolumeHost{}
|
||||
var _ AttachDetachVolumeHost = &fakeVolumeHost{}
|
||||
|
||||
func NewFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost {
|
||||
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil)
|
||||
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
|
||||
}
|
||||
|
||||
func NewFakeVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeVolumeHost {
|
||||
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil)
|
||||
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil)
|
||||
}
|
||||
|
||||
func NewFakeVolumeHostWithNodeLabels(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) *fakeVolumeHost {
|
||||
volHost := newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil)
|
||||
volHost := newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
|
||||
volHost.nodeLabels = labels
|
||||
return volHost
|
||||
}
|
||||
|
||||
func NewFakeVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister) *fakeVolumeHost {
|
||||
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister)
|
||||
func NewFakeVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeVolumeHost {
|
||||
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
|
||||
}
|
||||
|
||||
func NewFakeVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) *fakeVolumeHost {
|
||||
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil)
|
||||
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil, nil)
|
||||
}
|
||||
|
||||
func newFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister) *fakeVolumeHost {
|
||||
host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud, nodeName: nodeName, csiDriverLister: driverLister}
|
||||
func newFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeVolumeHost {
|
||||
host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud, nodeName: nodeName, csiDriverLister: driverLister, volumeAttachmentLister: volumeAttachLister}
|
||||
host.mounter = mount.NewFakeMounter(nil)
|
||||
host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap)
|
||||
host.exec = &testingexec.FakeExec{DisableScripts: true}
|
||||
@ -1840,6 +1841,10 @@ func (f *fakeVolumeHost) CSIDriverLister() storagelistersv1.CSIDriverLister {
|
||||
return f.csiDriverLister
|
||||
}
|
||||
|
||||
func (f *fakeVolumeHost) VolumeAttachmentLister() storagelistersv1.VolumeAttachmentLister {
|
||||
return f.volumeAttachmentLister
|
||||
}
|
||||
|
||||
func (f *fakeVolumeHost) CSIDriversSynced() cache.InformerSynced {
|
||||
// not needed for testing
|
||||
return nil
|
||||
|
@ -180,6 +180,7 @@ func TestPodDeletionWithDswp(t *testing.T) {
|
||||
// start controller loop
|
||||
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
||||
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
||||
go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh)
|
||||
initCSIObjects(stopCh, informers)
|
||||
go ctrl.Run(stopCh)
|
||||
defer close(stopCh)
|
||||
@ -256,6 +257,7 @@ func TestPodUpdateWithWithADC(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
||||
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
||||
go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh)
|
||||
initCSIObjects(stopCh, informers)
|
||||
go ctrl.Run(stopCh)
|
||||
|
||||
@ -325,6 +327,7 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
||||
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
||||
go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh)
|
||||
initCSIObjects(stopCh, informers)
|
||||
go ctrl.Run(stopCh)
|
||||
|
||||
@ -429,6 +432,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
|
||||
informers.Core().V1().PersistentVolumes(),
|
||||
informers.Storage().V1().CSINodes(),
|
||||
informers.Storage().V1().CSIDrivers(),
|
||||
informers.Storage().V1().VolumeAttachments(),
|
||||
cloud,
|
||||
plugins,
|
||||
nil, /* prober */
|
||||
@ -504,6 +508,7 @@ func TestPodAddedByDswp(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
||||
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
||||
go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh)
|
||||
initCSIObjects(stopCh, informers)
|
||||
go ctrl.Run(stopCh)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user