diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 12d2ea41cb3..6a45e70d76b 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" coreinformers "k8s.io/client-go/informers/core/v1" @@ -384,6 +385,8 @@ func (adc *attachDetachController) populateActualStateOfWorld(logger klog.Logger for _, node := range nodes { nodeName := types.NodeName(node.Name) + volumesInUse := sets.New(node.Status.VolumesInUse...) + for _, attachedVolume := range node.Status.VolumesAttached { uniqueName := attachedVolume.Name // The nil VolumeSpec is safe only in the case the volume is not in use by any pod. @@ -396,9 +399,13 @@ func (adc *attachDetachController) populateActualStateOfWorld(logger klog.Logger logger.Error(err, "Failed to mark the volume as attached") continue } - adc.processVolumesInUse(logger, nodeName, node.Status.VolumesInUse) - adc.addNodeToDswp(node, types.NodeName(node.Name)) + inUse := volumesInUse.Has(uniqueName) + err = adc.actualStateOfWorld.SetVolumeMountedByNode(logger, uniqueName, nodeName, inUse) + if err != nil { + logger.Error(err, "Failed to set volume mounted by node") + } } + adc.addNodeToDswp(node, types.NodeName(node.Name)) } err = adc.processVolumeAttachments(logger) if err != nil { diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 28d34f66d9d..5cb9fed9fff 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" kcache "k8s.io/client-go/tools/cache" "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" @@ -162,6 +163,84 @@ func Test_AttachDetachControllerStateOfWorldPopulators_Positive(t *testing.T) { } } +func BenchmarkPopulateActualStateOfWorld(b *testing.B) { + // Arrange + fakeKubeClient := fake.NewSimpleClientset() + + // populate 10000 nodes, each with 100 volumes + for i := 0; i < 10000; i++ { + nodeName := fmt.Sprintf("node-%d", i) + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Labels: map[string]string{ + "name": nodeName, + }, + Annotations: map[string]string{ + util.ControllerManagedAttachAnnotation: "true", + }, + }, + } + for j := 0; j < 100; j++ { + volumeName := v1.UniqueVolumeName(fmt.Sprintf("test-volume/vol-%d-%d", i, j)) + node.Status.VolumesAttached = append(node.Status.VolumesAttached, v1.AttachedVolume{ + Name: volumeName, + DevicePath: fmt.Sprintf("/dev/disk/by-id/vol-%d-%d", i, j), + }) + node.Status.VolumesInUse = append(node.Status.VolumesInUse, volumeName) + _, err := fakeKubeClient.CoreV1().PersistentVolumes().Create(context.Background(), &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("vol-%d-%d", i, j), + }, + }, metav1.CreateOptions{}) + if err != nil { + b.Fatalf("failed to create PV: %v", err) + } + } + _, err := fakeKubeClient.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{}) + if err != nil { + b.Fatalf("failed to create node: %v", err) + } + } + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + + logger, ctx := ktesting.NewTestContext(b) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + adcObj, err := NewAttachDetachController( + logger, + fakeKubeClient, + informerFactory.Core().V1().Pods(), + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Storage().V1().CSINodes(), + informerFactory.Storage().V1().CSIDrivers(), + informerFactory.Storage().V1().VolumeAttachments(), + nil, /* cloud */ + nil, /* plugins */ + nil, /* prober */ + false, + 5*time.Second, + DefaultTimerConfig, + ) + + if err != nil { + b.Fatalf("Run failed with error. Expected: Actual: <%v>", err) + } + adc := adcObj.(*attachDetachController) + + // Act + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + + b.ResetTimer() + err = adc.populateActualStateOfWorld(logger) + if err != nil { + b.Fatalf("Run failed with error. Expected: Actual: <%v>", err) + } +} + func Test_AttachDetachControllerRecovery(t *testing.T) { attachDetachRecoveryTestCase(t, []*v1.Pod{}, []*v1.Pod{}) newPod1 := controllervolumetesting.NewPodWithVolume("newpod-1", "volumeName2", "mynode-1")