diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 12d2ea41cb3..6a45e70d76b 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" coreinformers "k8s.io/client-go/informers/core/v1" @@ -384,6 +385,8 @@ func (adc *attachDetachController) populateActualStateOfWorld(logger klog.Logger for _, node := range nodes { nodeName := types.NodeName(node.Name) + volumesInUse := sets.New(node.Status.VolumesInUse...) + for _, attachedVolume := range node.Status.VolumesAttached { uniqueName := attachedVolume.Name // The nil VolumeSpec is safe only in the case the volume is not in use by any pod. @@ -396,9 +399,13 @@ func (adc *attachDetachController) populateActualStateOfWorld(logger klog.Logger logger.Error(err, "Failed to mark the volume as attached") continue } - adc.processVolumesInUse(logger, nodeName, node.Status.VolumesInUse) - adc.addNodeToDswp(node, types.NodeName(node.Name)) + inUse := volumesInUse.Has(uniqueName) + err = adc.actualStateOfWorld.SetVolumeMountedByNode(logger, uniqueName, nodeName, inUse) + if err != nil { + logger.Error(err, "Failed to set volume mounted by node") + } } + adc.addNodeToDswp(node, types.NodeName(node.Name)) } err = adc.processVolumeAttachments(logger) if err != nil { diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 3bb584dde47..5cb9fed9fff 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -26,7 +26,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" kcache "k8s.io/client-go/tools/cache" "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" @@ -73,44 +75,42 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { } } -func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) { +func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) { // Arrange fakeKubeClient := controllervolumetesting.CreateTestClient() informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) - podInformer := informerFactory.Core().V1().Pods() - nodeInformer := informerFactory.Core().V1().Nodes() - pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() - pvInformer := informerFactory.Core().V1().PersistentVolumes() - volumeAttachmentInformer := informerFactory.Storage().V1().VolumeAttachments() - adc := &attachDetachController{ - kubeClient: fakeKubeClient, - pvcLister: pvcInformer.Lister(), - pvcsSynced: pvcInformer.Informer().HasSynced, - pvLister: pvInformer.Lister(), - pvsSynced: pvInformer.Informer().HasSynced, - podLister: podInformer.Lister(), - podsSynced: podInformer.Informer().HasSynced, - nodeLister: nodeInformer.Lister(), - nodesSynced: nodeInformer.Informer().HasSynced, - volumeAttachmentLister: volumeAttachmentInformer.Lister(), - volumeAttachmentSynced: volumeAttachmentInformer.Informer().HasSynced, - cloud: nil, + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + adcObj, err := NewAttachDetachController( + logger, + fakeKubeClient, + informerFactory.Core().V1().Pods(), + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Storage().V1().CSINodes(), + informerFactory.Storage().V1().CSIDrivers(), + informerFactory.Storage().V1().VolumeAttachments(), + nil, /* cloud */ + controllervolumetesting.CreateTestPlugin(), + nil, /* prober */ + false, + 5*time.Second, + DefaultTimerConfig, + ) + + if err != nil { + t.Fatalf("Run failed with error. Expected: Actual: <%v>", err) } + adc := adcObj.(*attachDetachController) // Act - plugins := controllervolumetesting.CreateTestPlugin() - var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) - if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil { - t.Fatalf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err) - } - - adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr) - adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr) - - logger, _ := ktesting.NewTestContext(t) - err := adc.populateActualStateOfWorld(logger) + err = adc.populateActualStateOfWorld(logger) if err != nil { t.Fatalf("Run failed with error. Expected: Actual: <%v>", err) } @@ -128,11 +128,22 @@ func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) { for _, node := range nodes { nodeName := types.NodeName(node.Name) + inUseVolumes := sets.New(node.Status.VolumesInUse...) + allAttachedVolumes := map[v1.UniqueVolumeName]cache.AttachedVolume{} + for _, v := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) { + allAttachedVolumes[v.VolumeName] = v + } + for _, attachedVolume := range node.Status.VolumesAttached { attachedState := adc.actualStateOfWorld.GetAttachState(attachedVolume.Name, nodeName) if attachedState != cache.AttachStateAttached { t.Fatalf("Run failed with error. Node %s, volume %s not found", nodeName, attachedVolume.Name) } + inUse := inUseVolumes.Has(attachedVolume.Name) + mounted := allAttachedVolumes[attachedVolume.Name].MountedByNode + if mounted != inUse { + t.Fatalf("Node %s, volume %s MountedByNode %v unexpected", nodeName, attachedVolume.Name, mounted) + } } } @@ -152,6 +163,84 @@ func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) { } } +func BenchmarkPopulateActualStateOfWorld(b *testing.B) { + // Arrange + fakeKubeClient := fake.NewSimpleClientset() + + // populate 10000 nodes, each with 100 volumes + for i := 0; i < 10000; i++ { + nodeName := fmt.Sprintf("node-%d", i) + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Labels: map[string]string{ + "name": nodeName, + }, + Annotations: map[string]string{ + util.ControllerManagedAttachAnnotation: "true", + }, + }, + } + for j := 0; j < 100; j++ { + volumeName := v1.UniqueVolumeName(fmt.Sprintf("test-volume/vol-%d-%d", i, j)) + node.Status.VolumesAttached = append(node.Status.VolumesAttached, v1.AttachedVolume{ + Name: volumeName, + DevicePath: fmt.Sprintf("/dev/disk/by-id/vol-%d-%d", i, j), + }) + node.Status.VolumesInUse = append(node.Status.VolumesInUse, volumeName) + _, err := fakeKubeClient.CoreV1().PersistentVolumes().Create(context.Background(), &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("vol-%d-%d", i, j), + }, + }, metav1.CreateOptions{}) + if err != nil { + b.Fatalf("failed to create PV: %v", err) + } + } + _, err := fakeKubeClient.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{}) + if err != nil { + b.Fatalf("failed to create node: %v", err) + } + } + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + + logger, ctx := ktesting.NewTestContext(b) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + adcObj, err := NewAttachDetachController( + logger, + fakeKubeClient, + informerFactory.Core().V1().Pods(), + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Storage().V1().CSINodes(), + informerFactory.Storage().V1().CSIDrivers(), + informerFactory.Storage().V1().VolumeAttachments(), + nil, /* cloud */ + nil, /* plugins */ + nil, /* prober */ + false, + 5*time.Second, + DefaultTimerConfig, + ) + + if err != nil { + b.Fatalf("Run failed with error. Expected: Actual: <%v>", err) + } + adc := adcObj.(*attachDetachController) + + // Act + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + + b.ResetTimer() + err = adc.populateActualStateOfWorld(logger) + if err != nil { + b.Fatalf("Run failed with error. Expected: Actual: <%v>", err) + } +} + func Test_AttachDetachControllerRecovery(t *testing.T) { attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{}) newPod1 := controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1") diff --git a/pkg/controller/volume/attachdetach/testing/testvolumespec.go b/pkg/controller/volume/attachdetach/testing/testvolumespec.go index 0ce94935b27..262951d23a5 100644 --- a/pkg/controller/volume/attachdetach/testing/testvolumespec.go +++ b/pkg/controller/volume/attachdetach/testing/testvolumespec.go @@ -155,8 +155,9 @@ func CreateTestClient() *fake.Clientset { // We want also the "mynode" node since all the testing pods live there nodeName = nodeNamePrefix } - attachVolumeToNode(nodes, "lostVolumeName", nodeName) + attachVolumeToNode(nodes, "lostVolumeName", nodeName, false) } + attachVolumeToNode(nodes, "inUseVolume", nodeNamePrefix, true) fakeClient.AddReactor("update", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) { updateAction := action.(core.UpdateAction) node := updateAction.GetObject().(*v1.Node) @@ -312,21 +313,18 @@ func NewNFSPV(pvName, volumeName string) *v1.PersistentVolume { } } -func attachVolumeToNode(nodes *v1.NodeList, volumeName, nodeName string) { +func attachVolumeToNode(nodes *v1.NodeList, volumeName, nodeName string, inUse bool) { // if nodeName exists, get the object.. if not create node object var node *v1.Node - found := false - nodes.Size() for i := range nodes.Items { - curNode := nodes.Items[i] + curNode := &nodes.Items[i] if curNode.ObjectMeta.Name == nodeName { - node = &curNode - found = true + node = curNode break } } - if !found { - node = &v1.Node{ + if node == nil { + nodes.Items = append(nodes.Items, v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: nodeName, Labels: map[string]string{ @@ -336,24 +334,19 @@ func attachVolumeToNode(nodes *v1.NodeList, volumeName, nodeName string) { util.ControllerManagedAttachAnnotation: "true", }, }, - Status: v1.NodeStatus{ - VolumesAttached: []v1.AttachedVolume{ - { - Name: v1.UniqueVolumeName(TestPluginName + "/" + volumeName), - DevicePath: "fake/path", - }, - }, - }, - } - } else { - volumeAttached := v1.AttachedVolume{ - Name: v1.UniqueVolumeName(TestPluginName + "/" + volumeName), - DevicePath: "fake/path", - } - node.Status.VolumesAttached = append(node.Status.VolumesAttached, volumeAttached) + }) + node = &nodes.Items[len(nodes.Items)-1] } + uniqueVolumeName := v1.UniqueVolumeName(TestPluginName + "/" + volumeName) + volumeAttached := v1.AttachedVolume{ + Name: uniqueVolumeName, + DevicePath: "fake/path", + } + node.Status.VolumesAttached = append(node.Status.VolumesAttached, volumeAttached) - nodes.Items = append(nodes.Items, *node) + if inUse { + node.Status.VolumesInUse = append(node.Status.VolumesInUse, uniqueVolumeName) + } } type TestPlugin struct {