mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #91307 from yuga711/attach
CSI: Modify VolumeAttachment check to use Informer/Cache
This commit is contained in:
commit
98f250f883
@ -332,6 +332,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
|
|||||||
ctx.InformerFactory.Core().V1().PersistentVolumes(),
|
ctx.InformerFactory.Core().V1().PersistentVolumes(),
|
||||||
csiNodeInformer,
|
csiNodeInformer,
|
||||||
csiDriverInformer,
|
csiDriverInformer,
|
||||||
|
ctx.InformerFactory.Storage().V1().VolumeAttachments(),
|
||||||
ctx.Cloud,
|
ctx.Cloud,
|
||||||
plugins,
|
plugins,
|
||||||
GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
|
GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
|
||||||
|
@ -111,6 +111,7 @@ func NewAttachDetachController(
|
|||||||
pvInformer coreinformers.PersistentVolumeInformer,
|
pvInformer coreinformers.PersistentVolumeInformer,
|
||||||
csiNodeInformer storageinformersv1.CSINodeInformer,
|
csiNodeInformer storageinformersv1.CSINodeInformer,
|
||||||
csiDriverInformer storageinformersv1.CSIDriverInformer,
|
csiDriverInformer storageinformersv1.CSIDriverInformer,
|
||||||
|
volumeAttachmentInformer storageinformersv1.VolumeAttachmentInformer,
|
||||||
cloud cloudprovider.Interface,
|
cloud cloudprovider.Interface,
|
||||||
plugins []volume.VolumePlugin,
|
plugins []volume.VolumePlugin,
|
||||||
prober volume.DynamicPluginProber,
|
prober volume.DynamicPluginProber,
|
||||||
@ -142,6 +143,9 @@ func NewAttachDetachController(
|
|||||||
adc.csiDriverLister = csiDriverInformer.Lister()
|
adc.csiDriverLister = csiDriverInformer.Lister()
|
||||||
adc.csiDriversSynced = csiDriverInformer.Informer().HasSynced
|
adc.csiDriversSynced = csiDriverInformer.Informer().HasSynced
|
||||||
|
|
||||||
|
adc.volumeAttachmentLister = volumeAttachmentInformer.Lister()
|
||||||
|
adc.volumeAttachmentSynced = volumeAttachmentInformer.Informer().HasSynced
|
||||||
|
|
||||||
if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
|
if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
|
||||||
return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
|
return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
|
||||||
}
|
}
|
||||||
@ -254,6 +258,12 @@ type attachDetachController struct {
|
|||||||
csiDriverLister storagelistersv1.CSIDriverLister
|
csiDriverLister storagelistersv1.CSIDriverLister
|
||||||
csiDriversSynced kcache.InformerSynced
|
csiDriversSynced kcache.InformerSynced
|
||||||
|
|
||||||
|
// volumeAttachmentLister is the shared volumeAttachment lister used to fetch and store
|
||||||
|
// VolumeAttachment objects from the API server. It is shared with other controllers
|
||||||
|
// and therefore the VolumeAttachment objects in its store should be treated as immutable.
|
||||||
|
volumeAttachmentLister storagelistersv1.VolumeAttachmentLister
|
||||||
|
volumeAttachmentSynced kcache.InformerSynced
|
||||||
|
|
||||||
// cloud provider used by volume host
|
// cloud provider used by volume host
|
||||||
cloud cloudprovider.Interface
|
cloud cloudprovider.Interface
|
||||||
|
|
||||||
@ -319,6 +329,9 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
|
|||||||
if adc.csiDriversSynced != nil {
|
if adc.csiDriversSynced != nil {
|
||||||
synced = append(synced, adc.csiDriversSynced)
|
synced = append(synced, adc.csiDriversSynced)
|
||||||
}
|
}
|
||||||
|
if adc.volumeAttachmentSynced != nil {
|
||||||
|
synced = append(synced, adc.volumeAttachmentSynced)
|
||||||
|
}
|
||||||
|
|
||||||
if !kcache.WaitForNamedCacheSync("attach detach", stopCh, synced...) {
|
if !kcache.WaitForNamedCacheSync("attach detach", stopCh, synced...) {
|
||||||
return
|
return
|
||||||
@ -675,6 +688,10 @@ func (adc *attachDetachController) IsAttachDetachController() bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (adc *attachDetachController) VolumeAttachmentLister() storagelistersv1.VolumeAttachmentLister {
|
||||||
|
return adc.volumeAttachmentLister
|
||||||
|
}
|
||||||
|
|
||||||
// VolumeHost implementation
|
// VolumeHost implementation
|
||||||
// This is an unfortunate requirement of the current factoring of volume plugin
|
// This is an unfortunate requirement of the current factoring of volume plugin
|
||||||
// initializing code. It requires kubelet specific methods used by the mounting
|
// initializing code. It requires kubelet specific methods used by the mounting
|
||||||
|
@ -48,6 +48,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
|
|||||||
informerFactory.Core().V1().PersistentVolumes(),
|
informerFactory.Core().V1().PersistentVolumes(),
|
||||||
informerFactory.Storage().V1().CSINodes(),
|
informerFactory.Storage().V1().CSINodes(),
|
||||||
informerFactory.Storage().V1().CSIDrivers(),
|
informerFactory.Storage().V1().CSIDrivers(),
|
||||||
|
informerFactory.Storage().V1().VolumeAttachments(),
|
||||||
nil, /* cloud */
|
nil, /* cloud */
|
||||||
nil, /* plugins */
|
nil, /* plugins */
|
||||||
nil, /* prober */
|
nil, /* prober */
|
||||||
@ -168,6 +169,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
|
|||||||
informerFactory.Core().V1().PersistentVolumes(),
|
informerFactory.Core().V1().PersistentVolumes(),
|
||||||
informerFactory.Storage().V1().CSINodes(),
|
informerFactory.Storage().V1().CSINodes(),
|
||||||
informerFactory.Storage().V1().CSIDrivers(),
|
informerFactory.Storage().V1().CSIDrivers(),
|
||||||
|
informerFactory.Storage().V1().VolumeAttachments(),
|
||||||
nil, /* cloud */
|
nil, /* cloud */
|
||||||
plugins,
|
plugins,
|
||||||
prober,
|
prober,
|
||||||
|
@ -79,8 +79,10 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
|
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
|
||||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||||
|
@ -197,8 +197,19 @@ func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.No
|
|||||||
}
|
}
|
||||||
|
|
||||||
attachID := getAttachmentName(volumeHandle, driverName, string(nodeName))
|
attachID := getAttachmentName(volumeHandle, driverName, string(nodeName))
|
||||||
|
var attach *storage.VolumeAttachment
|
||||||
|
if c.plugin.volumeAttachmentLister != nil {
|
||||||
|
attach, err = c.plugin.volumeAttachmentLister.Get(attachID)
|
||||||
|
if err == nil {
|
||||||
|
attached[spec] = attach.Status.Attached
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
klog.V(4).Info(log("attacher.VolumesAreAttached failed in AttachmentLister for attach.ID=%v: %v. Probing the API server.", attachID, err))
|
||||||
|
}
|
||||||
|
// The cache lookup is not setup or the object is not found in the cache.
|
||||||
|
// Get the object from the API server.
|
||||||
klog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
|
klog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
|
||||||
attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
|
attach, err = c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
attached[spec] = false
|
attached[spec] = false
|
||||||
klog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err))
|
klog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err))
|
||||||
|
@ -38,8 +38,10 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
|
storageinformer "k8s.io/client-go/informers/storage/v1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
fakeclient "k8s.io/client-go/kubernetes/fake"
|
fakeclient "k8s.io/client-go/kubernetes/fake"
|
||||||
|
storagelister "k8s.io/client-go/listers/storage/v1"
|
||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
utiltesting "k8s.io/client-go/util/testing"
|
utiltesting "k8s.io/client-go/util/testing"
|
||||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
@ -106,7 +108,9 @@ func markVolumeAttached(t *testing.T, client clientset.Interface, watch *watch.R
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
watch.Modify(attach)
|
if watch != nil {
|
||||||
|
watch.Modify(attach)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -197,7 +201,7 @@ func TestAttacherAttach(t *testing.T) {
|
|||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
t.Logf("test case: %s", tc.name)
|
t.Logf("test case: %s", tc.name)
|
||||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
|
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
attacher, err := plug.NewAttacher()
|
attacher, err := plug.NewAttacher()
|
||||||
@ -281,7 +285,7 @@ func TestAttacherAttachWithInline(t *testing.T) {
|
|||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
t.Logf("test case: %s", tc.name)
|
t.Logf("test case: %s", tc.name)
|
||||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
|
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
attacher, err := plug.NewAttacher()
|
attacher, err := plug.NewAttacher()
|
||||||
@ -349,7 +353,7 @@ func TestAttacherWithCSIDriver(t *testing.T) {
|
|||||||
getTestCSIDriver("attachable", nil, &bTrue, nil),
|
getTestCSIDriver("attachable", nil, &bTrue, nil),
|
||||||
getTestCSIDriver("nil", nil, nil, nil),
|
getTestCSIDriver("nil", nil, nil, nil),
|
||||||
)
|
)
|
||||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, fakeClient)
|
plug, _, tmpDir, _ := newTestWatchPlugin(t, fakeClient, true)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
attacher, err := plug.NewAttacher()
|
attacher, err := plug.NewAttacher()
|
||||||
@ -390,7 +394,7 @@ func TestAttacherWithCSIDriver(t *testing.T) {
|
|||||||
status := storage.VolumeAttachmentStatus{
|
status := storage.VolumeAttachmentStatus{
|
||||||
Attached: true,
|
Attached: true,
|
||||||
}
|
}
|
||||||
markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, expectedAttachID, status)
|
markVolumeAttached(t, csiAttacher.k8s, nil, expectedAttachID, status)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
})
|
})
|
||||||
@ -515,7 +519,7 @@ func TestAttacherWaitForAttach(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
|
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
attacher, err := plug.NewAttacher()
|
attacher, err := plug.NewAttacher()
|
||||||
@ -597,7 +601,7 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
|
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
attacher, err := plug.NewAttacher()
|
attacher, err := plug.NewAttacher()
|
||||||
@ -684,7 +688,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) {
|
|||||||
|
|
||||||
for i, tc := range testCases {
|
for i, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
|
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
attacher, err := plug.NewAttacher()
|
attacher, err := plug.NewAttacher()
|
||||||
@ -941,7 +945,7 @@ func TestAttacherDetach(t *testing.T) {
|
|||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
t.Logf("running test: %v", tc.name)
|
t.Logf("running test: %v", tc.name)
|
||||||
plug, fakeWatcher, tmpDir, client := newTestWatchPlugin(t, nil)
|
plug, fakeWatcher, tmpDir, client := newTestWatchPlugin(t, nil, false)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
if tc.reactor != nil {
|
if tc.reactor != nil {
|
||||||
client.PrependReactor("*", "*", tc.reactor)
|
client.PrependReactor("*", "*", tc.reactor)
|
||||||
@ -998,7 +1002,7 @@ func TestAttacherDetach(t *testing.T) {
|
|||||||
func TestAttacherGetDeviceMountPath(t *testing.T) {
|
func TestAttacherGetDeviceMountPath(t *testing.T) {
|
||||||
// Setup
|
// Setup
|
||||||
// Create a new attacher
|
// Create a new attacher
|
||||||
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
|
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
attacher, err0 := plug.NewAttacher()
|
attacher, err0 := plug.NewAttacher()
|
||||||
if err0 != nil {
|
if err0 != nil {
|
||||||
@ -1163,7 +1167,7 @@ func TestAttacherMountDevice(t *testing.T) {
|
|||||||
|
|
||||||
// Setup
|
// Setup
|
||||||
// Create a new attacher
|
// Create a new attacher
|
||||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
|
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
attacher, err0 := plug.NewAttacher()
|
attacher, err0 := plug.NewAttacher()
|
||||||
if err0 != nil {
|
if err0 != nil {
|
||||||
@ -1314,7 +1318,7 @@ func TestAttacherMountDeviceWithInline(t *testing.T) {
|
|||||||
|
|
||||||
// Setup
|
// Setup
|
||||||
// Create a new attacher
|
// Create a new attacher
|
||||||
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
|
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
attacher, err0 := plug.NewAttacher()
|
attacher, err0 := plug.NewAttacher()
|
||||||
if err0 != nil {
|
if err0 != nil {
|
||||||
@ -1442,7 +1446,7 @@ func TestAttacherUnmountDevice(t *testing.T) {
|
|||||||
t.Logf("Running test case: %s", tc.testName)
|
t.Logf("Running test case: %s", tc.testName)
|
||||||
// Setup
|
// Setup
|
||||||
// Create a new attacher
|
// Create a new attacher
|
||||||
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
|
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true)
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
attacher, err0 := plug.NewAttacher()
|
attacher, err0 := plug.NewAttacher()
|
||||||
if err0 != nil {
|
if err0 != nil {
|
||||||
@ -1529,7 +1533,7 @@ func TestAttacherUnmountDevice(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// create a plugin mgr to load plugins and setup a fake client
|
// create a plugin mgr to load plugins and setup a fake client
|
||||||
func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) {
|
func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset, setupInformer bool) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) {
|
||||||
tmpDir, err := utiltesting.MkTmpdir("csi-test")
|
tmpDir, err := utiltesting.MkTmpdir("csi-test")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("can't create temp dir: %v", err)
|
t.Fatalf("can't create temp dir: %v", err)
|
||||||
@ -1545,12 +1549,24 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
|
|||||||
Spec: v1.NodeSpec{},
|
Spec: v1.NodeSpec{},
|
||||||
})
|
})
|
||||||
fakeWatcher := watch.NewRaceFreeFake()
|
fakeWatcher := watch.NewRaceFreeFake()
|
||||||
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
|
if !setupInformer {
|
||||||
|
// TODO: In the fakeClient, if default watchReactor is overwritten, the volumeAttachmentInformer
|
||||||
|
// and the csiAttacher.Attach both endup reading from same channel causing hang in Attach().
|
||||||
|
// So, until this is fixed, we don't overwrite default reactor while setting up volumeAttachment informer.
|
||||||
|
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
|
||||||
|
}
|
||||||
|
|
||||||
// Start informer for CSIDrivers.
|
// Start informer for CSIDrivers.
|
||||||
factory := informers.NewSharedInformerFactory(fakeClient, CsiResyncPeriod)
|
factory := informers.NewSharedInformerFactory(fakeClient, CsiResyncPeriod)
|
||||||
csiDriverInformer := factory.Storage().V1().CSIDrivers()
|
csiDriverInformer := factory.Storage().V1().CSIDrivers()
|
||||||
csiDriverLister := csiDriverInformer.Lister()
|
csiDriverLister := csiDriverInformer.Lister()
|
||||||
|
var volumeAttachmentInformer storageinformer.VolumeAttachmentInformer
|
||||||
|
var volumeAttachmentLister storagelister.VolumeAttachmentLister
|
||||||
|
if setupInformer {
|
||||||
|
volumeAttachmentInformer = factory.Storage().V1().VolumeAttachments()
|
||||||
|
volumeAttachmentLister = volumeAttachmentInformer.Lister()
|
||||||
|
}
|
||||||
|
|
||||||
factory.Start(wait.NeverStop)
|
factory.Start(wait.NeverStop)
|
||||||
|
|
||||||
host := volumetest.NewFakeVolumeHostWithCSINodeName(t,
|
host := volumetest.NewFakeVolumeHostWithCSINodeName(t,
|
||||||
@ -1559,6 +1575,7 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
|
|||||||
ProbeVolumePlugins(),
|
ProbeVolumePlugins(),
|
||||||
"fakeNode",
|
"fakeNode",
|
||||||
csiDriverLister,
|
csiDriverLister,
|
||||||
|
volumeAttachmentLister,
|
||||||
)
|
)
|
||||||
plugMgr := host.GetPluginMgr()
|
plugMgr := host.GetPluginMgr()
|
||||||
|
|
||||||
@ -1577,5 +1594,10 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
|
|||||||
return csiDriverInformer.Informer().HasSynced(), nil
|
return csiDriverInformer.Informer().HasSynced(), nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if volumeAttachmentInformer != nil {
|
||||||
|
wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) {
|
||||||
|
return volumeAttachmentInformer.Informer().HasSynced(), nil
|
||||||
|
})
|
||||||
|
}
|
||||||
return csiPlug, fakeWatcher, tmpDir, fakeClient
|
return csiPlug, fakeWatcher, tmpDir, fakeClient
|
||||||
}
|
}
|
||||||
|
@ -59,9 +59,10 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type csiPlugin struct {
|
type csiPlugin struct {
|
||||||
host volume.VolumeHost
|
host volume.VolumeHost
|
||||||
blockEnabled bool
|
blockEnabled bool
|
||||||
csiDriverLister storagelisters.CSIDriverLister
|
csiDriverLister storagelisters.CSIDriverLister
|
||||||
|
volumeAttachmentLister storagelisters.VolumeAttachmentLister
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProbeVolumePlugins returns implemented plugins
|
// ProbeVolumePlugins returns implemented plugins
|
||||||
@ -186,13 +187,17 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
|
|||||||
if csiClient == nil {
|
if csiClient == nil {
|
||||||
klog.Warning(log("kubeclient not set, assuming standalone kubelet"))
|
klog.Warning(log("kubeclient not set, assuming standalone kubelet"))
|
||||||
} else {
|
} else {
|
||||||
// set CSIDriverLister
|
// set CSIDriverLister and volumeAttachmentLister
|
||||||
adcHost, ok := host.(volume.AttachDetachVolumeHost)
|
adcHost, ok := host.(volume.AttachDetachVolumeHost)
|
||||||
if ok {
|
if ok {
|
||||||
p.csiDriverLister = adcHost.CSIDriverLister()
|
p.csiDriverLister = adcHost.CSIDriverLister()
|
||||||
if p.csiDriverLister == nil {
|
if p.csiDriverLister == nil {
|
||||||
klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost"))
|
klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost"))
|
||||||
}
|
}
|
||||||
|
p.volumeAttachmentLister = adcHost.VolumeAttachmentLister()
|
||||||
|
if p.volumeAttachmentLister == nil {
|
||||||
|
klog.Error(log("VolumeAttachmentLister not found on AttachDetachVolumeHost"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
kletHost, ok := host.(volume.KubeletVolumeHost)
|
kletHost, ok := host.(volume.KubeletVolumeHost)
|
||||||
if ok {
|
if ok {
|
||||||
@ -200,6 +205,8 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
|
|||||||
if p.csiDriverLister == nil {
|
if p.csiDriverLister == nil {
|
||||||
klog.Error(log("CSIDriverLister not found on KubeletVolumeHost"))
|
klog.Error(log("CSIDriverLister not found on KubeletVolumeHost"))
|
||||||
}
|
}
|
||||||
|
// We don't run the volumeAttachmentLister in the kubelet context
|
||||||
|
p.volumeAttachmentLister = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,6 +62,8 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
|
|||||||
factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod)
|
factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod)
|
||||||
csiDriverInformer := factory.Storage().V1().CSIDrivers()
|
csiDriverInformer := factory.Storage().V1().CSIDrivers()
|
||||||
csiDriverLister := csiDriverInformer.Lister()
|
csiDriverLister := csiDriverInformer.Lister()
|
||||||
|
volumeAttachmentInformer := factory.Storage().V1().VolumeAttachments()
|
||||||
|
volumeAttachmentLister := volumeAttachmentInformer.Lister()
|
||||||
go factory.Start(wait.NeverStop)
|
go factory.Start(wait.NeverStop)
|
||||||
|
|
||||||
host := volumetest.NewFakeVolumeHostWithCSINodeName(t,
|
host := volumetest.NewFakeVolumeHostWithCSINodeName(t,
|
||||||
@ -70,6 +72,7 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
|
|||||||
ProbeVolumePlugins(),
|
ProbeVolumePlugins(),
|
||||||
"fakeNode",
|
"fakeNode",
|
||||||
csiDriverLister,
|
csiDriverLister,
|
||||||
|
volumeAttachmentLister,
|
||||||
)
|
)
|
||||||
|
|
||||||
pluginMgr := host.GetPluginMgr()
|
pluginMgr := host.GetPluginMgr()
|
||||||
@ -88,6 +91,9 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
|
|||||||
return csiDriverInformer.Informer().HasSynced(), nil
|
return csiDriverInformer.Informer().HasSynced(), nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) {
|
||||||
|
return volumeAttachmentInformer.Informer().HasSynced(), nil
|
||||||
|
})
|
||||||
return csiPlug, tmpDir
|
return csiPlug, tmpDir
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1018,6 +1024,7 @@ func TestPluginFindAttachablePlugin(t *testing.T) {
|
|||||||
ProbeVolumePlugins(),
|
ProbeVolumePlugins(),
|
||||||
"fakeNode",
|
"fakeNode",
|
||||||
factory.Storage().V1().CSIDrivers().Lister(),
|
factory.Storage().V1().CSIDrivers().Lister(),
|
||||||
|
factory.Storage().V1().VolumeAttachments().Lister(),
|
||||||
)
|
)
|
||||||
|
|
||||||
plugMgr := host.GetPluginMgr()
|
plugMgr := host.GetPluginMgr()
|
||||||
@ -1137,7 +1144,7 @@ func TestPluginFindDeviceMountablePluginBySpec(t *testing.T) {
|
|||||||
Spec: v1.NodeSpec{},
|
Spec: v1.NodeSpec{},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
host := volumetest.NewFakeVolumeHostWithCSINodeName(t, tmpDir, client, ProbeVolumePlugins(), "fakeNode", nil)
|
host := volumetest.NewFakeVolumeHostWithCSINodeName(t, tmpDir, client, ProbeVolumePlugins(), "fakeNode", nil, nil)
|
||||||
plugMgr := host.GetPluginMgr()
|
plugMgr := host.GetPluginMgr()
|
||||||
plug, err := plugMgr.FindDeviceMountablePluginBySpec(test.spec)
|
plug, err := plugMgr.FindDeviceMountablePluginBySpec(test.spec)
|
||||||
if err != nil && !test.shouldFail {
|
if err != nil && !test.shouldFail {
|
||||||
|
@ -250,6 +250,7 @@ func TestCSI_VolumeAll(t *testing.T) {
|
|||||||
|
|
||||||
factory := informers.NewSharedInformerFactory(client, time.Hour /* disable resync */)
|
factory := informers.NewSharedInformerFactory(client, time.Hour /* disable resync */)
|
||||||
csiDriverInformer := factory.Storage().V1().CSIDrivers()
|
csiDriverInformer := factory.Storage().V1().CSIDrivers()
|
||||||
|
volumeAttachmentInformer := factory.Storage().V1().VolumeAttachments()
|
||||||
if driverInfo != nil {
|
if driverInfo != nil {
|
||||||
csiDriverInformer.Informer().GetStore().Add(driverInfo)
|
csiDriverInformer.Informer().GetStore().Add(driverInfo)
|
||||||
}
|
}
|
||||||
@ -261,6 +262,7 @@ func TestCSI_VolumeAll(t *testing.T) {
|
|||||||
ProbeVolumePlugins(),
|
ProbeVolumePlugins(),
|
||||||
"fakeNode",
|
"fakeNode",
|
||||||
csiDriverInformer.Lister(),
|
csiDriverInformer.Lister(),
|
||||||
|
volumeAttachmentInformer.Lister(),
|
||||||
)
|
)
|
||||||
plugMgr := host.GetPluginMgr()
|
plugMgr := host.GetPluginMgr()
|
||||||
csiClient := setupClient(t, true)
|
csiClient := setupClient(t, true)
|
||||||
|
@ -969,6 +969,7 @@ func TestInstallCSIDriverExistingAnnotation(t *testing.T) {
|
|||||||
nil,
|
nil,
|
||||||
nodeName,
|
nodeName,
|
||||||
nil,
|
nil,
|
||||||
|
nil,
|
||||||
)
|
)
|
||||||
|
|
||||||
nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil)
|
nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil)
|
||||||
@ -1030,6 +1031,7 @@ func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []t
|
|||||||
nil,
|
nil,
|
||||||
nodeName,
|
nodeName,
|
||||||
nil,
|
nil,
|
||||||
|
nil,
|
||||||
)
|
)
|
||||||
nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil)
|
nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil)
|
||||||
|
|
||||||
|
@ -60,6 +60,7 @@ func NewTestPlugin(t *testing.T, client *fakeclient.Clientset) (*volume.VolumePl
|
|||||||
csi.ProbeVolumePlugins(),
|
csi.ProbeVolumePlugins(),
|
||||||
"fakeNode",
|
"fakeNode",
|
||||||
csiDriverLister,
|
csiDriverLister,
|
||||||
|
nil,
|
||||||
)
|
)
|
||||||
plugMgr := host.GetPluginMgr()
|
plugMgr := host.GetPluginMgr()
|
||||||
|
|
||||||
|
@ -351,6 +351,8 @@ type AttachDetachVolumeHost interface {
|
|||||||
// CSIDriverLister returns the informer lister for the CSIDriver API Object
|
// CSIDriverLister returns the informer lister for the CSIDriver API Object
|
||||||
CSIDriverLister() storagelistersv1.CSIDriverLister
|
CSIDriverLister() storagelistersv1.CSIDriverLister
|
||||||
|
|
||||||
|
// VolumeAttachmentLister returns the informer lister for the VolumeAttachment API Object
|
||||||
|
VolumeAttachmentLister() storagelistersv1.VolumeAttachmentLister
|
||||||
// IsAttachDetachController is an interface marker to strictly tie AttachDetachVolumeHost
|
// IsAttachDetachController is an interface marker to strictly tie AttachDetachVolumeHost
|
||||||
// to the attachDetachController
|
// to the attachDetachController
|
||||||
IsAttachDetachController() bool
|
IsAttachDetachController() bool
|
||||||
|
@ -100,49 +100,50 @@ const (
|
|||||||
|
|
||||||
// fakeVolumeHost is useful for testing volume plugins.
|
// fakeVolumeHost is useful for testing volume plugins.
|
||||||
type fakeVolumeHost struct {
|
type fakeVolumeHost struct {
|
||||||
rootDir string
|
rootDir string
|
||||||
kubeClient clientset.Interface
|
kubeClient clientset.Interface
|
||||||
pluginMgr *VolumePluginMgr
|
pluginMgr *VolumePluginMgr
|
||||||
cloud cloudprovider.Interface
|
cloud cloudprovider.Interface
|
||||||
mounter mount.Interface
|
mounter mount.Interface
|
||||||
hostUtil hostutil.HostUtils
|
hostUtil hostutil.HostUtils
|
||||||
exec *testingexec.FakeExec
|
exec *testingexec.FakeExec
|
||||||
nodeLabels map[string]string
|
nodeLabels map[string]string
|
||||||
nodeName string
|
nodeName string
|
||||||
subpather subpath.Interface
|
subpather subpath.Interface
|
||||||
csiDriverLister storagelistersv1.CSIDriverLister
|
csiDriverLister storagelistersv1.CSIDriverLister
|
||||||
informerFactory informers.SharedInformerFactory
|
volumeAttachmentLister storagelistersv1.VolumeAttachmentLister
|
||||||
kubeletErr error
|
informerFactory informers.SharedInformerFactory
|
||||||
mux sync.Mutex
|
kubeletErr error
|
||||||
|
mux sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ VolumeHost = &fakeVolumeHost{}
|
var _ VolumeHost = &fakeVolumeHost{}
|
||||||
var _ AttachDetachVolumeHost = &fakeVolumeHost{}
|
var _ AttachDetachVolumeHost = &fakeVolumeHost{}
|
||||||
|
|
||||||
func NewFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost {
|
func NewFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost {
|
||||||
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil)
|
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFakeVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeVolumeHost {
|
func NewFakeVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeVolumeHost {
|
||||||
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil)
|
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFakeVolumeHostWithNodeLabels(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) *fakeVolumeHost {
|
func NewFakeVolumeHostWithNodeLabels(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) *fakeVolumeHost {
|
||||||
volHost := newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil)
|
volHost := newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
|
||||||
volHost.nodeLabels = labels
|
volHost.nodeLabels = labels
|
||||||
return volHost
|
return volHost
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFakeVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister) *fakeVolumeHost {
|
func NewFakeVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeVolumeHost {
|
||||||
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister)
|
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFakeVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) *fakeVolumeHost {
|
func NewFakeVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) *fakeVolumeHost {
|
||||||
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil)
|
return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister) *fakeVolumeHost {
|
func newFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeVolumeHost {
|
||||||
host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud, nodeName: nodeName, csiDriverLister: driverLister}
|
host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud, nodeName: nodeName, csiDriverLister: driverLister, volumeAttachmentLister: volumeAttachLister}
|
||||||
host.mounter = mount.NewFakeMounter(nil)
|
host.mounter = mount.NewFakeMounter(nil)
|
||||||
host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap)
|
host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap)
|
||||||
host.exec = &testingexec.FakeExec{DisableScripts: true}
|
host.exec = &testingexec.FakeExec{DisableScripts: true}
|
||||||
@ -1840,6 +1841,10 @@ func (f *fakeVolumeHost) CSIDriverLister() storagelistersv1.CSIDriverLister {
|
|||||||
return f.csiDriverLister
|
return f.csiDriverLister
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fakeVolumeHost) VolumeAttachmentLister() storagelistersv1.VolumeAttachmentLister {
|
||||||
|
return f.volumeAttachmentLister
|
||||||
|
}
|
||||||
|
|
||||||
func (f *fakeVolumeHost) CSIDriversSynced() cache.InformerSynced {
|
func (f *fakeVolumeHost) CSIDriversSynced() cache.InformerSynced {
|
||||||
// not needed for testing
|
// not needed for testing
|
||||||
return nil
|
return nil
|
||||||
|
@ -180,6 +180,7 @@ func TestPodDeletionWithDswp(t *testing.T) {
|
|||||||
// start controller loop
|
// start controller loop
|
||||||
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
||||||
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
||||||
|
go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh)
|
||||||
initCSIObjects(stopCh, informers)
|
initCSIObjects(stopCh, informers)
|
||||||
go ctrl.Run(stopCh)
|
go ctrl.Run(stopCh)
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
@ -256,6 +257,7 @@ func TestPodUpdateWithWithADC(t *testing.T) {
|
|||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
||||||
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
||||||
|
go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh)
|
||||||
initCSIObjects(stopCh, informers)
|
initCSIObjects(stopCh, informers)
|
||||||
go ctrl.Run(stopCh)
|
go ctrl.Run(stopCh)
|
||||||
|
|
||||||
@ -325,6 +327,7 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) {
|
|||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
||||||
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
||||||
|
go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh)
|
||||||
initCSIObjects(stopCh, informers)
|
initCSIObjects(stopCh, informers)
|
||||||
go ctrl.Run(stopCh)
|
go ctrl.Run(stopCh)
|
||||||
|
|
||||||
@ -429,6 +432,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
|
|||||||
informers.Core().V1().PersistentVolumes(),
|
informers.Core().V1().PersistentVolumes(),
|
||||||
informers.Storage().V1().CSINodes(),
|
informers.Storage().V1().CSINodes(),
|
||||||
informers.Storage().V1().CSIDrivers(),
|
informers.Storage().V1().CSIDrivers(),
|
||||||
|
informers.Storage().V1().VolumeAttachments(),
|
||||||
cloud,
|
cloud,
|
||||||
plugins,
|
plugins,
|
||||||
nil, /* prober */
|
nil, /* prober */
|
||||||
@ -504,6 +508,7 @@ func TestPodAddedByDswp(t *testing.T) {
|
|||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
||||||
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
||||||
|
go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh)
|
||||||
initCSIObjects(stopCh, informers)
|
initCSIObjects(stopCh, informers)
|
||||||
go ctrl.Run(stopCh)
|
go ctrl.Run(stopCh)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user