diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index e0178415cd7..c36a1069ee7 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -332,6 +332,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err ctx.InformerFactory.Core().V1().PersistentVolumes(), csiNodeInformer, csiDriverInformer, + ctx.InformerFactory.Storage().V1().VolumeAttachments(), ctx.Cloud, plugins, GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index ad4af05853e..79b2bdfe4b3 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -111,6 +111,7 @@ func NewAttachDetachController( pvInformer coreinformers.PersistentVolumeInformer, csiNodeInformer storageinformersv1.CSINodeInformer, csiDriverInformer storageinformersv1.CSIDriverInformer, + volumeAttachmentInformer storageinformersv1.VolumeAttachmentInformer, cloud cloudprovider.Interface, plugins []volume.VolumePlugin, prober volume.DynamicPluginProber, @@ -142,6 +143,9 @@ func NewAttachDetachController( adc.csiDriverLister = csiDriverInformer.Lister() adc.csiDriversSynced = csiDriverInformer.Informer().HasSynced + adc.volumeAttachmentLister = volumeAttachmentInformer.Lister() + adc.volumeAttachmentSynced = volumeAttachmentInformer.Informer().HasSynced + if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil { return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err) } @@ -254,6 +258,12 @@ type attachDetachController struct { csiDriverLister storagelistersv1.CSIDriverLister csiDriversSynced kcache.InformerSynced + // volumeAttachmentLister is the shared volumeAttachment lister used to fetch and store + // VolumeAttachment objects from the API server. It is shared with other controllers + // and therefore the VolumeAttachment objects in its store should be treated as immutable. + volumeAttachmentLister storagelistersv1.VolumeAttachmentLister + volumeAttachmentSynced kcache.InformerSynced + // cloud provider used by volume host cloud cloudprovider.Interface @@ -319,6 +329,9 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) { if adc.csiDriversSynced != nil { synced = append(synced, adc.csiDriversSynced) } + if adc.volumeAttachmentSynced != nil { + synced = append(synced, adc.volumeAttachmentSynced) + } if !kcache.WaitForNamedCacheSync("attach detach", stopCh, synced...) { return @@ -675,6 +688,10 @@ func (adc *attachDetachController) IsAttachDetachController() bool { return true } +func (adc *attachDetachController) VolumeAttachmentLister() storagelistersv1.VolumeAttachmentLister { + return adc.volumeAttachmentLister +} + // VolumeHost implementation // This is an unfortunate requirement of the current factoring of volume plugin // initializing code. It requires kubelet specific methods used by the mounting diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index ec40e5887a7..16988563438 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -48,6 +48,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { informerFactory.Core().V1().PersistentVolumes(), informerFactory.Storage().V1().CSINodes(), informerFactory.Storage().V1().CSIDrivers(), + informerFactory.Storage().V1().VolumeAttachments(), nil, /* cloud */ nil, /* plugins */ nil, /* prober */ @@ -168,6 +169,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2 informerFactory.Core().V1().PersistentVolumes(), informerFactory.Storage().V1().CSINodes(), informerFactory.Storage().V1().CSIDrivers(), + informerFactory.Storage().V1().VolumeAttachments(), nil, /* cloud */ plugins, prober, diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 8a248d18c41..eb5c727f7a7 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -79,8 +79,10 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/util/testing:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 6b4b14acf45..e429dc9a645 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -197,8 +197,19 @@ func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.No } attachID := getAttachmentName(volumeHandle, driverName, string(nodeName)) + var attach *storage.VolumeAttachment + if c.plugin.volumeAttachmentLister != nil { + attach, err = c.plugin.volumeAttachmentLister.Get(attachID) + if err == nil { + attached[spec] = attach.Status.Attached + continue + } + klog.V(4).Info(log("attacher.VolumesAreAttached failed in AttachmentLister for attach.ID=%v: %v. Probing the API server.", attachID, err)) + } + // The cache lookup is not setup or the object is not found in the cache. + // Get the object from the API server. klog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID)) - attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{}) + attach, err = c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{}) if err != nil { attached[spec] = false klog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err)) diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 1486ac161a8..d469f3f9049 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -38,8 +38,10 @@ import ( "k8s.io/apimachinery/pkg/watch" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" + storageinformer "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" fakeclient "k8s.io/client-go/kubernetes/fake" + storagelister "k8s.io/client-go/listers/storage/v1" core "k8s.io/client-go/testing" utiltesting "k8s.io/client-go/util/testing" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -106,7 +108,9 @@ func markVolumeAttached(t *testing.T, client clientset.Interface, watch *watch.R if err != nil { t.Error(err) } - watch.Modify(attach) + if watch != nil { + watch.Modify(attach) + } } } @@ -197,7 +201,7 @@ func TestAttacherAttach(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("test case: %s", tc.name) - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil) + plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -281,7 +285,7 @@ func TestAttacherAttachWithInline(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("test case: %s", tc.name) - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil) + plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -349,7 +353,7 @@ func TestAttacherWithCSIDriver(t *testing.T) { getTestCSIDriver("attachable", nil, &bTrue, nil), getTestCSIDriver("nil", nil, nil, nil), ) - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, fakeClient) + plug, _, tmpDir, _ := newTestWatchPlugin(t, fakeClient, true) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -390,7 +394,7 @@ func TestAttacherWithCSIDriver(t *testing.T) { status := storage.VolumeAttachmentStatus{ Attached: true, } - markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, expectedAttachID, status) + markVolumeAttached(t, csiAttacher.k8s, nil, expectedAttachID, status) } wg.Wait() }) @@ -515,7 +519,7 @@ func TestAttacherWaitForAttach(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - plug, _, tmpDir, _ := newTestWatchPlugin(t, nil) + plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -597,7 +601,7 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - plug, _, tmpDir, _ := newTestWatchPlugin(t, nil) + plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -684,7 +688,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { for i, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil) + plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) defer os.RemoveAll(tmpDir) attacher, err := plug.NewAttacher() @@ -941,7 +945,7 @@ func TestAttacherDetach(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("running test: %v", tc.name) - plug, fakeWatcher, tmpDir, client := newTestWatchPlugin(t, nil) + plug, fakeWatcher, tmpDir, client := newTestWatchPlugin(t, nil, false) defer os.RemoveAll(tmpDir) if tc.reactor != nil { client.PrependReactor("*", "*", tc.reactor) @@ -998,7 +1002,7 @@ func TestAttacherDetach(t *testing.T) { func TestAttacherGetDeviceMountPath(t *testing.T) { // Setup // Create a new attacher - plug, _, tmpDir, _ := newTestWatchPlugin(t, nil) + plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true) defer os.RemoveAll(tmpDir) attacher, err0 := plug.NewAttacher() if err0 != nil { @@ -1163,7 +1167,7 @@ func TestAttacherMountDevice(t *testing.T) { // Setup // Create a new attacher - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil) + plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) defer os.RemoveAll(tmpDir) attacher, err0 := plug.NewAttacher() if err0 != nil { @@ -1314,7 +1318,7 @@ func TestAttacherMountDeviceWithInline(t *testing.T) { // Setup // Create a new attacher - plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil) + plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false) defer os.RemoveAll(tmpDir) attacher, err0 := plug.NewAttacher() if err0 != nil { @@ -1442,7 +1446,7 @@ func TestAttacherUnmountDevice(t *testing.T) { t.Logf("Running test case: %s", tc.testName) // Setup // Create a new attacher - plug, _, tmpDir, _ := newTestWatchPlugin(t, nil) + plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true) defer os.RemoveAll(tmpDir) attacher, err0 := plug.NewAttacher() if err0 != nil { @@ -1529,7 +1533,7 @@ func TestAttacherUnmountDevice(t *testing.T) { } // create a plugin mgr to load plugins and setup a fake client -func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) { +func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset, setupInformer bool) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) { tmpDir, err := utiltesting.MkTmpdir("csi-test") if err != nil { t.Fatalf("can't create temp dir: %v", err) @@ -1545,12 +1549,24 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu Spec: v1.NodeSpec{}, }) fakeWatcher := watch.NewRaceFreeFake() - fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + if !setupInformer { + // TODO: In the fakeClient, if default watchReactor is overwritten, the volumeAttachmentInformer + // and the csiAttacher.Attach both endup reading from same channel causing hang in Attach(). + // So, until this is fixed, we don't overwrite default reactor while setting up volumeAttachment informer. + fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) + } // Start informer for CSIDrivers. factory := informers.NewSharedInformerFactory(fakeClient, CsiResyncPeriod) csiDriverInformer := factory.Storage().V1().CSIDrivers() csiDriverLister := csiDriverInformer.Lister() + var volumeAttachmentInformer storageinformer.VolumeAttachmentInformer + var volumeAttachmentLister storagelister.VolumeAttachmentLister + if setupInformer { + volumeAttachmentInformer = factory.Storage().V1().VolumeAttachments() + volumeAttachmentLister = volumeAttachmentInformer.Lister() + } + factory.Start(wait.NeverStop) host := volumetest.NewFakeVolumeHostWithCSINodeName(t, @@ -1559,6 +1575,7 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu ProbeVolumePlugins(), "fakeNode", csiDriverLister, + volumeAttachmentLister, ) plugMgr := host.GetPluginMgr() @@ -1577,5 +1594,10 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu return csiDriverInformer.Informer().HasSynced(), nil }) + if volumeAttachmentInformer != nil { + wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) { + return volumeAttachmentInformer.Informer().HasSynced(), nil + }) + } return csiPlug, fakeWatcher, tmpDir, fakeClient } diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index ea3f5b0bae7..61da1c3f2ca 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -59,9 +59,10 @@ const ( ) type csiPlugin struct { - host volume.VolumeHost - blockEnabled bool - csiDriverLister storagelisters.CSIDriverLister + host volume.VolumeHost + blockEnabled bool + csiDriverLister storagelisters.CSIDriverLister + volumeAttachmentLister storagelisters.VolumeAttachmentLister } // ProbeVolumePlugins returns implemented plugins @@ -186,13 +187,17 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { if csiClient == nil { klog.Warning(log("kubeclient not set, assuming standalone kubelet")) } else { - // set CSIDriverLister + // set CSIDriverLister and volumeAttachmentLister adcHost, ok := host.(volume.AttachDetachVolumeHost) if ok { p.csiDriverLister = adcHost.CSIDriverLister() if p.csiDriverLister == nil { klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost")) } + p.volumeAttachmentLister = adcHost.VolumeAttachmentLister() + if p.volumeAttachmentLister == nil { + klog.Error(log("VolumeAttachmentLister not found on AttachDetachVolumeHost")) + } } kletHost, ok := host.(volume.KubeletVolumeHost) if ok { @@ -200,6 +205,8 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error { if p.csiDriverLister == nil { klog.Error(log("CSIDriverLister not found on KubeletVolumeHost")) } + // We don't run the volumeAttachmentLister in the kubelet context + p.volumeAttachmentLister = nil } } diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index df2f7da1e70..571f038a20c 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -62,6 +62,8 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod) csiDriverInformer := factory.Storage().V1().CSIDrivers() csiDriverLister := csiDriverInformer.Lister() + volumeAttachmentInformer := factory.Storage().V1().VolumeAttachments() + volumeAttachmentLister := volumeAttachmentInformer.Lister() go factory.Start(wait.NeverStop) host := volumetest.NewFakeVolumeHostWithCSINodeName(t, @@ -70,6 +72,7 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri ProbeVolumePlugins(), "fakeNode", csiDriverLister, + volumeAttachmentLister, ) pluginMgr := host.GetPluginMgr() @@ -88,6 +91,9 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri return csiDriverInformer.Informer().HasSynced(), nil }) + wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) { + return volumeAttachmentInformer.Informer().HasSynced(), nil + }) return csiPlug, tmpDir } @@ -1018,6 +1024,7 @@ func TestPluginFindAttachablePlugin(t *testing.T) { ProbeVolumePlugins(), "fakeNode", factory.Storage().V1().CSIDrivers().Lister(), + factory.Storage().V1().VolumeAttachments().Lister(), ) plugMgr := host.GetPluginMgr() @@ -1137,7 +1144,7 @@ func TestPluginFindDeviceMountablePluginBySpec(t *testing.T) { Spec: v1.NodeSpec{}, }, ) - host := volumetest.NewFakeVolumeHostWithCSINodeName(t, tmpDir, client, ProbeVolumePlugins(), "fakeNode", nil) + host := volumetest.NewFakeVolumeHostWithCSINodeName(t, tmpDir, client, ProbeVolumePlugins(), "fakeNode", nil, nil) plugMgr := host.GetPluginMgr() plug, err := plugMgr.FindDeviceMountablePluginBySpec(test.spec) if err != nil && !test.shouldFail { diff --git a/pkg/volume/csi/csi_test.go b/pkg/volume/csi/csi_test.go index 0675b8ebde4..f4e09be2bc1 100644 --- a/pkg/volume/csi/csi_test.go +++ b/pkg/volume/csi/csi_test.go @@ -250,6 +250,7 @@ func TestCSI_VolumeAll(t *testing.T) { factory := informers.NewSharedInformerFactory(client, time.Hour /* disable resync */) csiDriverInformer := factory.Storage().V1().CSIDrivers() + volumeAttachmentInformer := factory.Storage().V1().VolumeAttachments() if driverInfo != nil { csiDriverInformer.Informer().GetStore().Add(driverInfo) } @@ -261,6 +262,7 @@ func TestCSI_VolumeAll(t *testing.T) { ProbeVolumePlugins(), "fakeNode", csiDriverInformer.Lister(), + volumeAttachmentInformer.Lister(), ) plugMgr := host.GetPluginMgr() csiClient := setupClient(t, true) diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go index cd8e836fa2e..bacf068ada9 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go @@ -969,6 +969,7 @@ func TestInstallCSIDriverExistingAnnotation(t *testing.T) { nil, nodeName, nil, + nil, ) nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil) @@ -1030,6 +1031,7 @@ func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []t nil, nodeName, nil, + nil, ) nim := NewNodeInfoManager(types.NodeName(nodeName), host, nil) diff --git a/pkg/volume/csi/testing/testing.go b/pkg/volume/csi/testing/testing.go index b15b265d7a6..2c0a26169f3 100644 --- a/pkg/volume/csi/testing/testing.go +++ b/pkg/volume/csi/testing/testing.go @@ -60,6 +60,7 @@ func NewTestPlugin(t *testing.T, client *fakeclient.Clientset) (*volume.VolumePl csi.ProbeVolumePlugins(), "fakeNode", csiDriverLister, + nil, ) plugMgr := host.GetPluginMgr() diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 01fc6dfdefd..c4546a4452d 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -351,6 +351,8 @@ type AttachDetachVolumeHost interface { // CSIDriverLister returns the informer lister for the CSIDriver API Object CSIDriverLister() storagelistersv1.CSIDriverLister + // VolumeAttachmentLister returns the informer lister for the VolumeAttachment API Object + VolumeAttachmentLister() storagelistersv1.VolumeAttachmentLister // IsAttachDetachController is an interface marker to strictly tie AttachDetachVolumeHost // to the attachDetachController IsAttachDetachController() bool diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 18ce9cfd6eb..4ce7962328b 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -100,49 +100,50 @@ const ( // fakeVolumeHost is useful for testing volume plugins. type fakeVolumeHost struct { - rootDir string - kubeClient clientset.Interface - pluginMgr *VolumePluginMgr - cloud cloudprovider.Interface - mounter mount.Interface - hostUtil hostutil.HostUtils - exec *testingexec.FakeExec - nodeLabels map[string]string - nodeName string - subpather subpath.Interface - csiDriverLister storagelistersv1.CSIDriverLister - informerFactory informers.SharedInformerFactory - kubeletErr error - mux sync.Mutex + rootDir string + kubeClient clientset.Interface + pluginMgr *VolumePluginMgr + cloud cloudprovider.Interface + mounter mount.Interface + hostUtil hostutil.HostUtils + exec *testingexec.FakeExec + nodeLabels map[string]string + nodeName string + subpather subpath.Interface + csiDriverLister storagelistersv1.CSIDriverLister + volumeAttachmentLister storagelistersv1.VolumeAttachmentLister + informerFactory informers.SharedInformerFactory + kubeletErr error + mux sync.Mutex } var _ VolumeHost = &fakeVolumeHost{} var _ AttachDetachVolumeHost = &fakeVolumeHost{} func NewFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeVolumeHost { - return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil) + return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil) } func NewFakeVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeVolumeHost { - return newFakeVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil) + return newFakeVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil) } func NewFakeVolumeHostWithNodeLabels(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, labels map[string]string) *fakeVolumeHost { - volHost := newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil) + volHost := newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil) volHost.nodeLabels = labels return volHost } -func NewFakeVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister) *fakeVolumeHost { - return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister) +func NewFakeVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeVolumeHost { + return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister) } func NewFakeVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) *fakeVolumeHost { - return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil) + return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil, nil) } -func newFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister) *fakeVolumeHost { - host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud, nodeName: nodeName, csiDriverLister: driverLister} +func newFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeVolumeHost { + host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud, nodeName: nodeName, csiDriverLister: driverLister, volumeAttachmentLister: volumeAttachLister} host.mounter = mount.NewFakeMounter(nil) host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap) host.exec = &testingexec.FakeExec{DisableScripts: true} @@ -1840,6 +1841,10 @@ func (f *fakeVolumeHost) CSIDriverLister() storagelistersv1.CSIDriverLister { return f.csiDriverLister } +func (f *fakeVolumeHost) VolumeAttachmentLister() storagelistersv1.VolumeAttachmentLister { + return f.volumeAttachmentLister +} + func (f *fakeVolumeHost) CSIDriversSynced() cache.InformerSynced { // not needed for testing return nil diff --git a/test/integration/volume/attach_detach_test.go b/test/integration/volume/attach_detach_test.go index 410191676c9..4e6d0ffb9f4 100644 --- a/test/integration/volume/attach_detach_test.go +++ b/test/integration/volume/attach_detach_test.go @@ -180,6 +180,7 @@ func TestPodDeletionWithDswp(t *testing.T) { // start controller loop go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) + go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh) initCSIObjects(stopCh, informers) go ctrl.Run(stopCh) defer close(stopCh) @@ -256,6 +257,7 @@ func TestPodUpdateWithWithADC(t *testing.T) { stopCh := make(chan struct{}) go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) + go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh) initCSIObjects(stopCh, informers) go ctrl.Run(stopCh) @@ -325,6 +327,7 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) { stopCh := make(chan struct{}) go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) + go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh) initCSIObjects(stopCh, informers) go ctrl.Run(stopCh) @@ -429,6 +432,7 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy informers.Core().V1().PersistentVolumes(), informers.Storage().V1().CSINodes(), informers.Storage().V1().CSIDrivers(), + informers.Storage().V1().VolumeAttachments(), cloud, plugins, nil, /* prober */ @@ -504,6 +508,7 @@ func TestPodAddedByDswp(t *testing.T) { stopCh := make(chan struct{}) go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh) go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh) + go informers.Storage().V1().VolumeAttachments().Informer().Run(stopCh) initCSIObjects(stopCh, informers) go ctrl.Run(stopCh)