From eb16aa1d4a5d36482bd58da6737364ef81759002 Mon Sep 17 00:00:00 2001 From: Hung Nguyen Date: Thu, 8 Aug 2024 16:57:55 +0000 Subject: [PATCH] improve PVC Protection Controller's processing mechanism with sample performance test --- .../pvc_protection_controller.go | 158 ++++++++--- .../pvc_protection_controller_test.go | 160 +++++++++-- test/e2e/storage/testsuites/base.go | 1 + .../e2e/storage/testsuites/pvcdeletionperf.go | 248 ++++++++++++++++++ 4 files changed, 513 insertions(+), 54 deletions(-) create mode 100644 test/e2e/storage/testsuites/pvcdeletionperf.go diff --git a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go index 9e8614bc0cf..550c1da4063 100644 --- a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go +++ b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go @@ -19,6 +19,7 @@ package pvcprotection import ( "context" "fmt" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -41,6 +42,65 @@ import ( // Controller is controller that removes PVCProtectionFinalizer // from PVCs that are used by no pods. + +type LazyLivePodList struct { + cache []v1.Pod + controller *Controller +} + +func (ll *LazyLivePodList) getCache() []v1.Pod { + return ll.cache +} + +func (ll *LazyLivePodList) setCache(pods []v1.Pod) { + ll.cache = pods +} + +type pvcData struct { + pvcKey string + pvcName string +} + +type pvcProcessingStore struct { + namespaceToPVCsMap map[string][]pvcData + namespaceQueue workqueue.TypedInterface[string] + mu sync.Mutex +} + +func NewPVCProcessingStore() *pvcProcessingStore { + return &pvcProcessingStore{ + namespaceToPVCsMap: make(map[string][]pvcData), + namespaceQueue: workqueue.NewTyped[string](), + } +} + +func (m *pvcProcessingStore) addOrUpdate(namespace string, pvcKey, pvcName string) { + m.mu.Lock() + defer m.mu.Unlock() + if _, exists := m.namespaceToPVCsMap[namespace]; !exists { + m.namespaceToPVCsMap[namespace] = make([]pvcData, 0) + m.namespaceQueue.Add(namespace) + } + m.namespaceToPVCsMap[namespace] = append(m.namespaceToPVCsMap[namespace], pvcData{pvcKey: pvcKey, pvcName: pvcName}) +} + +// Returns a list of pvcs and the associated namespace to be processed downstream +func (m *pvcProcessingStore) flushNextPVCsByNamespace() ([]pvcData, string) { + + nextNamespace, quit := m.namespaceQueue.Get() + if quit { + return nil, nextNamespace + } + + m.mu.Lock() + defer m.mu.Unlock() + pvcs := m.namespaceToPVCsMap[nextNamespace] + + delete(m.namespaceToPVCsMap, nextNamespace) + m.namespaceQueue.Done(nextNamespace) + return pvcs, nextNamespace +} + type Controller struct { client clientset.Interface @@ -51,7 +111,8 @@ type Controller struct { podListerSynced cache.InformerSynced podIndexer cache.Indexer - queue workqueue.TypedRateLimitingInterface[string] + queue workqueue.TypedRateLimitingInterface[string] + pvcProcessingStore *pvcProcessingStore } // NewPVCProtectionController returns a new instance of PVCProtectionController. @@ -62,6 +123,7 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: "pvcprotection"}, ), + pvcProcessingStore: NewPVCProcessingStore(), } e.pvcLister = pvcInformer.Lister() @@ -100,6 +162,7 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe func (c *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() + defer c.pvcProcessingStore.namespaceQueue.ShutDown() logger := klog.FromContext(ctx) logger.Info("Starting PVC protection controller") @@ -109,45 +172,64 @@ func (c *Controller) Run(ctx context.Context, workers int) { return } + go wait.UntilWithContext(ctx, c.runMainWorker, time.Second) for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, c.runWorker, time.Second) + go wait.UntilWithContext(ctx, c.runProcessNamespaceWorker, time.Second) } <-ctx.Done() } -func (c *Controller) runWorker(ctx context.Context) { - for c.processNextWorkItem(ctx) { +// Main worker batch-pulls PVC items off informer's work queue and populates namespace queue and namespace-PVCs map +func (c *Controller) runMainWorker(ctx context.Context) { + for c.processNextWorkItem() { } } -// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit. -func (c *Controller) processNextWorkItem(ctx context.Context) bool { - pvcKey, quit := c.queue.Get() - if quit { +// Consumer worker pulls items off namespace queue and processes associated PVCs +func (c *Controller) runProcessNamespaceWorker(ctx context.Context) { + for c.processPVCsByNamespace(ctx) { + } +} + +func (c *Controller) processNextWorkItem() bool { + queueLength := c.queue.Len() + for i := 0; i < queueLength; i++ { + pvcKey, quit := c.queue.Get() + if quit { + return false + } + pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey) + if err != nil { + utilruntime.HandleError(fmt.Errorf("error parsing PVC key %q: %w", pvcKey, err)) + } + c.pvcProcessingStore.addOrUpdate(pvcNamespace, pvcKey, pvcName) + } + return !c.queue.ShuttingDown() +} + +func (c *Controller) processPVCsByNamespace(ctx context.Context) bool { + pvcList, namespace := c.pvcProcessingStore.flushNextPVCsByNamespace() + if pvcList == nil { return false } - defer c.queue.Done(pvcKey) - pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey) - if err != nil { - utilruntime.HandleError(fmt.Errorf("error parsing PVC key %q: %v", pvcKey, err)) - return true + lazyLivePodList := &LazyLivePodList{controller: c} + for _, item := range pvcList { + pvcKey, pvcName := item.pvcKey, item.pvcName + err := c.processPVC(ctx, namespace, pvcName, lazyLivePodList) + if err == nil { + c.queue.Forget(pvcKey) + } else { + c.queue.AddRateLimited(pvcKey) + utilruntime.HandleError(fmt.Errorf("PVC %v in namespace %v failed with: %w", pvcName, namespace, err)) + } + c.queue.Done(pvcKey) } - - err = c.processPVC(ctx, pvcNamespace, pvcName) - if err == nil { - c.queue.Forget(pvcKey) - return true - } - - utilruntime.HandleError(fmt.Errorf("PVC %v failed with : %v", pvcKey, err)) - c.queue.AddRateLimited(pvcKey) - return true } -func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName string) error { +func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName string, lazyLivePodList *LazyLivePodList) error { logger := klog.FromContext(ctx) logger.V(4).Info("Processing PVC", "PVC", klog.KRef(pvcNamespace, pvcName)) startTime := time.Now() @@ -167,7 +249,7 @@ func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName strin if protectionutil.IsDeletionCandidate(pvc, volumeutil.PVCProtectionFinalizer) { // PVC should be deleted. Check if it's used and remove finalizer if // it's not. - isUsed, err := c.isBeingUsed(ctx, pvc) + isUsed, err := c.isBeingUsed(ctx, pvc, lazyLivePodList) if err != nil { return err } @@ -209,11 +291,11 @@ func (c *Controller) removeFinalizer(ctx context.Context, pvc *v1.PersistentVolu logger.Error(err, "Error removing protection finalizer from PVC", "PVC", klog.KObj(pvc)) return err } - logger.V(3).Info("Removed protection finalizer from PVC", "PVC", klog.KObj(pvc)) + logger.Info("Removed protection finalizer from PVC", "PVC", klog.KObj(pvc)) return nil } -func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) { +func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeClaim, lazyLivePodList *LazyLivePodList) (bool, error) { // Look for a Pod using pvc in the Informer's cache. If one is found the // correct decision to keep pvc is taken without doing an expensive live // list. @@ -229,7 +311,9 @@ func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeCl // mean such a Pod doesn't exist: it might just not be in the cache yet. To // be 100% confident that it is safe to delete pvc make sure no Pod is using // it among those returned by a live list. - return c.askAPIServer(ctx, pvc) + + // Use lazy live pod list instead of directly calling API server + return c.askAPIServer(ctx, pvc, lazyLivePodList) } func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeClaim) (bool, error) { @@ -258,16 +342,24 @@ func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeCla return false, nil } -func (c *Controller) askAPIServer(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) { +func (c *Controller) askAPIServer(ctx context.Context, pvc *v1.PersistentVolumeClaim, lazyLivePodList *LazyLivePodList) (bool, error) { logger := klog.FromContext(ctx) logger.V(4).Info("Looking for Pods using PVC with a live list", "PVC", klog.KObj(pvc)) + if lazyLivePodList.getCache() == nil { + podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(ctx, metav1.ListOptions{}) - podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(ctx, metav1.ListOptions{}) - if err != nil { - return false, fmt.Errorf("live list of pods failed: %s", err.Error()) + if err != nil { + return false, fmt.Errorf("live list of pods failed: %s", err.Error()) + } + + if podsList.Items == nil { + lazyLivePodList.setCache(make([]v1.Pod, 0)) + } else { + lazyLivePodList.setCache(podsList.Items) + } } - for _, pod := range podsList.Items { + for _, pod := range lazyLivePodList.getCache() { if c.podUsesPVC(logger, &pod, pvc) { return true, nil } diff --git a/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go b/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go index a75c752eaa9..dd5549ebb05 100644 --- a/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go +++ b/pkg/controller/volume/pvcprotection/pvc_protection_controller_test.go @@ -47,6 +47,7 @@ type reaction struct { const ( defaultNS = "default" + namespace2 = "namespace-2" defaultPVCName = "pvc1" defaultPodName = "pod1" defaultNodeName = "node1" @@ -69,6 +70,22 @@ func pod() *v1.Pod { } } +func podWithConfig(name string, namespace string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + UID: defaultUID, + }, + Spec: v1.PodSpec{ + NodeName: defaultNodeName, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + } +} + func unscheduled(pod *v1.Pod) *v1.Pod { pod.Spec.NodeName = "" return pod @@ -117,6 +134,15 @@ func pvc() *v1.PersistentVolumeClaim { } } +func pvcWithConfig(name string, namespace string) *v1.PersistentVolumeClaim { + return &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } +} + func withProtectionFinalizer(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim { pvc.Finalizers = append(pvc.Finalizers, volumeutil.PVCProtectionFinalizer) return pvc @@ -146,6 +172,23 @@ func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionF } } +func generatePodListErrorFunc(t *testing.T, failures int) clienttesting.ReactionFunc { + i := 0 + return func(action clienttesting.Action) (bool, runtime.Object, error) { + i++ + if i <= failures { + // Pod List fails + list, ok := action.(clienttesting.ListAction) + if !ok { + t.Fatalf("Reactor got non-list action: %+v", action) + } + return true, nil, apierrors.NewForbidden(list.GetResource().GroupResource(), "mock pod", errors.New("Mock error")) + } + // List succeeds + return false, nil, nil + } +} + func TestPVCProtectionController(t *testing.T) { pvcGVR := schema.GroupVersionResource{ Group: v1.GroupName, @@ -175,7 +218,7 @@ func TestPVCProtectionController(t *testing.T) { reactors []reaction // PVC event to simulate. This PVC will be automatically added to // initialObjects. - updatedPVC *v1.PersistentVolumeClaim + updatedPVCs []*v1.PersistentVolumeClaim // Pod event to simulate. This Pod will be automatically added to // initialObjects. updatedPod *v1.Pod @@ -190,20 +233,21 @@ func TestPVCProtectionController(t *testing.T) { // PVC events // { - name: "PVC without finalizer -> finalizer is added", - updatedPVC: pvc(), + name: "PVC without finalizer -> finalizer is added", + updatedPVCs: []*v1.PersistentVolumeClaim{pvc(), pvcWithConfig("pvc2", "namespace-2")}, expectedActions: []clienttesting.Action{ clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())), + clienttesting.NewUpdateAction(pvcGVR, "namespace-2", withProtectionFinalizer(pvcWithConfig("pvc2", "namespace-2"))), }, }, { name: "PVC with finalizer -> no action", - updatedPVC: withProtectionFinalizer(pvc()), + updatedPVCs: []*v1.PersistentVolumeClaim{withProtectionFinalizer(pvc()), withProtectionFinalizer(pvcWithConfig("pvc2", "namespace-2"))}, expectedActions: []clienttesting.Action{}, }, { - name: "saving PVC finalizer fails -> controller retries", - updatedPVC: pvc(), + name: "saving PVC finalizer fails -> controller retries", + updatedPVCs: []*v1.PersistentVolumeClaim{pvc()}, reactors: []reaction{ { verb: "update", @@ -221,16 +265,29 @@ func TestPVCProtectionController(t *testing.T) { }, }, { - name: "deleted PVC with finalizer -> finalizer is removed", - updatedPVC: deleted(withProtectionFinalizer(pvc())), + name: "deleted PVC with finalizers across different namespaces -> finalizer is removed", + updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())), + deleted(withProtectionFinalizer(pvcWithConfig("pvc2", "namespace-2")))}, expectedActions: []clienttesting.Action{ clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), + clienttesting.NewListAction(podGVR, podGVK, "namespace-2", metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, "namespace-2", deleted(pvcWithConfig("pvc2", "namespace-2"))), }, }, { - name: "finalizer removal fails -> controller retries", - updatedPVC: deleted(withProtectionFinalizer(pvc())), + name: "multiple PVCs with finalizer for the same namespace; no alive pods -> finalizer is removed and only one API pod list is made", + updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())), + deleted(withProtectionFinalizer(pvcWithConfig("pvc2", defaultNS)))}, + expectedActions: []clienttesting.Action{ + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc2", defaultNS))), + }, + }, + { + name: "finalizer removal fails -> controller retries", + updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))}, reactors: []reaction{ { verb: "update", @@ -250,12 +307,33 @@ func TestPVCProtectionController(t *testing.T) { clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, }, + { + name: "delete multiple PVCs of the same namespace; pod list fails for one PVC -> add failing PVC back to queue and continue to the next PVC", + updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())), deleted(withProtectionFinalizer(pvcWithConfig("pvc2", defaultNS)))}, + reactors: []reaction{ + { + verb: "list", + resource: "pods", + reactorfn: generatePodListErrorFunc(t, 1 /* update fails twice*/), + }, + }, + expectedActions: []clienttesting.Action{ + // Fails + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + // Succeed with next pvc in queue + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc2", defaultNS))), + + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), + }, + }, { name: "deleted PVC with finalizer + pod with the PVC exists -> finalizer is not removed", initialObjects: []runtime.Object{ withPVC(defaultPVCName, pod()), }, - updatedPVC: deleted(withProtectionFinalizer(pvc())), + updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))}, expectedActions: []clienttesting.Action{}, }, { @@ -263,18 +341,34 @@ func TestPVCProtectionController(t *testing.T) { initialObjects: []runtime.Object{ withEmptyDir(withPVC("unrelatedPVC", pod())), }, - updatedPVC: deleted(withProtectionFinalizer(pvc())), + updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))}, expectedActions: []clienttesting.Action{ clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), }, }, + { + name: "deleted multiple PVCs with finalizer (same namespace) + pod with unrelated PVC and EmptyDir exists -> only one live pod list & finalizers are removed", + initialObjects: []runtime.Object{ + withEmptyDir(withPVC("unrelatedPVC", pod())), + }, + updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())), + deleted(withProtectionFinalizer(pvcWithConfig("pvc2", defaultNS))), + deleted(withProtectionFinalizer(pvcWithConfig("pvc3", defaultNS))), + }, + expectedActions: []clienttesting.Action{ + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc2", defaultNS))), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc3", defaultNS))), + }, + }, { name: "deleted PVC with finalizer + pod with the PVC finished but is not deleted -> finalizer is not removed", initialObjects: []runtime.Object{ withStatus(v1.PodFailed, withPVC(defaultPVCName, pod())), }, - updatedPVC: deleted(withProtectionFinalizer(pvc())), + updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))}, expectedActions: []clienttesting.Action{}, }, { @@ -283,11 +377,26 @@ func TestPVCProtectionController(t *testing.T) { withPVC(defaultPVCName, pod()), }, informersAreLate: true, - updatedPVC: deleted(withProtectionFinalizer(pvc())), + updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))}, expectedActions: []clienttesting.Action{ clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), }, }, + + { + name: "mix of deleted PVCs with some used by a pod and some unused -> finalizer is removed only for unused PVCs", + initialObjects: []runtime.Object{ + withPVC(defaultPVCName, pod()), + withPVC("pvc3", podWithConfig("pod2", "namespace-3")), + }, + informersAreLate: true, + updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())), deleted(withProtectionFinalizer(pvcWithConfig("pvc2", defaultNS))), deleted(withProtectionFinalizer(pvcWithConfig("pvc3", "namespace-3")))}, + expectedActions: []clienttesting.Action{ + clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}), + clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc2", defaultNS))), + clienttesting.NewListAction(podGVR, podGVK, "namespace-3", metav1.ListOptions{}), + }, + }, // // Pod events // @@ -376,9 +485,11 @@ func TestPVCProtectionController(t *testing.T) { clientObjs []runtime.Object informersObjs []runtime.Object ) - if test.updatedPVC != nil { - clientObjs = append(clientObjs, test.updatedPVC) - informersObjs = append(informersObjs, test.updatedPVC) + if test.updatedPVCs != nil { + for i := 0; i < len(test.updatedPVCs); i++ { + clientObjs = append(clientObjs, test.updatedPVCs[i]) + informersObjs = append(informersObjs, test.updatedPVCs[i]) + } } if test.updatedPod != nil { clientObjs = append(clientObjs, test.updatedPod) @@ -388,7 +499,6 @@ func TestPVCProtectionController(t *testing.T) { if !test.informersAreLate { informersObjs = append(informersObjs, test.initialObjects...) } - // Create client with initial data client := fake.NewSimpleClientset(clientObjs...) @@ -423,8 +533,10 @@ func TestPVCProtectionController(t *testing.T) { } // Start the test by simulating an event - if test.updatedPVC != nil { - ctrl.pvcAddedUpdated(logger, test.updatedPVC) + if test.updatedPVCs != nil { + for i := 0; i < len(test.updatedPVCs); i++ { + ctrl.pvcAddedUpdated(logger, test.updatedPVCs[i]) + } } switch { case test.deletedPod != nil && test.updatedPod != nil && test.deletedPod.Namespace == test.updatedPod.Namespace && test.deletedPod.Name == test.updatedPod.Name: @@ -445,12 +557,18 @@ func TestPVCProtectionController(t *testing.T) { } if ctrl.queue.Len() > 0 { logger.V(5).Info("Non-empty queue, processing one", "test", test.name, "queueLength", ctrl.queue.Len()) - ctrl.processNextWorkItem(context.TODO()) + ctx := context.TODO() + ctrl.processNextWorkItem() + for ctrl.pvcProcessingStore.namespaceQueue.Len() != 0 { + ctrl.processPVCsByNamespace(ctx) + } } + if ctrl.queue.Len() > 0 { // There is still some work in the queue, process it now continue } + currentActionCount := len(client.Actions()) if currentActionCount < len(test.expectedActions) { // Do not log every wait, only when the action count changes. diff --git a/test/e2e/storage/testsuites/base.go b/test/e2e/storage/testsuites/base.go index a6efd496512..067e86c61cb 100644 --- a/test/e2e/storage/testsuites/base.go +++ b/test/e2e/storage/testsuites/base.go @@ -81,6 +81,7 @@ var CSISuites = append(BaseSuites, InitSnapshottableTestSuite, InitSnapshottableStressTestSuite, InitVolumePerformanceTestSuite, + InitPvcDeletionPerformanceTestSuite, InitReadWriteOncePodTestSuite, InitVolumeModifyTestSuite, ) diff --git a/test/e2e/storage/testsuites/pvcdeletionperf.go b/test/e2e/storage/testsuites/pvcdeletionperf.go new file mode 100644 index 00000000000..018e225a2d2 --- /dev/null +++ b/test/e2e/storage/testsuites/pvcdeletionperf.go @@ -0,0 +1,248 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testsuites + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/onsi/ginkgo/v2" + + v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/e2e/framework" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2epv "k8s.io/kubernetes/test/e2e/framework/pv" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + storageframework "k8s.io/kubernetes/test/e2e/storage/framework" + admissionapi "k8s.io/pod-security-admission/api" +) + +type pvcDeletionPerformanceTestSuite struct { + tsInfo storageframework.TestSuiteInfo +} + +var _ storageframework.TestSuite = &pvcDeletionPerformanceTestSuite{} + +const pvcDeletionTestTimeout = 30 * time.Minute + +// InitPvcDeletionPerformanceTestSuite returns pvcDeletionPerformanceTestSuite that implements TestSuite interface +// This test suite brings up a number of pods and PVCS (configured upstream), deletes the pods, and then deletes the PVCs. +// The main goal is to record the duration for the PVC/PV deletion process for each run, and so the test doesn't set explicit expectations to match against. +func InitPvcDeletionPerformanceTestSuite() storageframework.TestSuite { + return &pvcDeletionPerformanceTestSuite{ + tsInfo: storageframework.TestSuiteInfo{ + Name: "pvc-deletion-performance", + TestPatterns: []storageframework.TestPattern{ + storageframework.BlockVolModeDynamicPV, + }, + }, + } +} + +func (t *pvcDeletionPerformanceTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo { + return t.tsInfo +} + +func (t *pvcDeletionPerformanceTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) { +} + +func (t *pvcDeletionPerformanceTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) { + type local struct { + config *storageframework.PerTestConfig + cs clientset.Interface + ns *v1.Namespace + scName string + pvcs []*v1.PersistentVolumeClaim + options *storageframework.PerformanceTestOptions + stopCh chan struct{} + pods []*v1.Pod + } + var ( + dInfo *storageframework.DriverInfo + l *local + ) + ginkgo.BeforeEach(func() { + // Check preconditions + dDriver := driver.(storageframework.DynamicPVTestDriver) + if dDriver == nil { + e2eskipper.Skipf("Test driver does not support dynamically created volumes") + } + dInfo = dDriver.GetDriverInfo() + if dInfo == nil { + e2eskipper.Skipf("Failed to get Driver info -- skipping") + } + if dInfo.PerformanceTestOptions == nil || dInfo.PerformanceTestOptions.ProvisioningOptions == nil { + e2eskipper.Skipf("Driver %s doesn't specify performance test options -- skipping", dInfo.Name) + } + }) + + // Set high QPS for the framework to avoid client-side throttling from the test itself, + // which can interfere with measuring deletion time + frameworkOptions := framework.Options{ + ClientQPS: 500, + ClientBurst: 1000, + } + f := framework.NewFramework("pvc-deletion-performance", frameworkOptions, nil) + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + + ginkgo.AfterEach(func(ctx context.Context) { + if l != nil { + if l.stopCh != nil { + ginkgo.By("Closing informer channel") + close(l.stopCh) + } + deletingStats := &performanceStats{ + mutex: &sync.Mutex{}, + perObjectInterval: make(map[string]*interval), + operationMetrics: &storageframework.Metrics{}, + } + var ( + errs []error + mu sync.Mutex + wg sync.WaitGroup + ) + + wg.Add(len(l.pods)) + for _, pod := range l.pods { + go func(pod *v1.Pod) { + defer ginkgo.GinkgoRecover() + defer wg.Done() + + framework.Logf("Deleting pod %v", pod.Name) + err := e2epod.DeletePodWithWait(ctx, l.cs, pod) + mu.Lock() + defer mu.Unlock() + errs = append(errs, err) + }(pod) + } + wg.Wait() + + ginkgo.By("Deleting all PVCs") + + startTime := time.Now() + wg.Add(len(l.pvcs)) + for _, pvc := range l.pvcs { + go func(pvc *v1.PersistentVolumeClaim) { // Start a goroutine for each PVC + defer wg.Done() // Decrement the counter when the goroutine finishes + startDeletingPvcTime := time.Now() + framework.Logf("Start deleting PVC %v", pvc.GetName()) + deletingStats.mutex.Lock() + deletingStats.perObjectInterval[pvc.Name] = &interval{ + create: startDeletingPvcTime, + } + deletingStats.mutex.Unlock() + err := e2epv.DeletePersistentVolumeClaim(ctx, l.cs, pvc.Name, pvc.Namespace) + framework.ExpectNoError(err) + startDeletingPVTime := time.Now() + err = e2epv.WaitForPersistentVolumeDeleted(ctx, l.cs, pvc.Spec.VolumeName, 1*time.Second, 100*time.Minute) + framework.Logf("Deleted PV %v, PVC %v in %v", pvc.Spec.VolumeName, pvc.GetName(), time.Since(startDeletingPVTime)) + framework.ExpectNoError(err) + }(pvc) + } + wg.Wait() + + endTime := time.Now() // Capture overall end time + totalDuration := endTime.Sub(startTime) + framework.Logf("Deleted all PVC/PVs in %v", totalDuration) // Log total deletion time + + ginkgo.By(fmt.Sprintf("Deleting Storage Class %s", l.scName)) + err := l.cs.StorageV1().StorageClasses().Delete(ctx, l.scName, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + } else { + ginkgo.By("Local l setup is nil") + } + }) + + f.It("should delete volumes at scale within performance constraints", f.WithSlow(), f.WithSerial(), func(ctx context.Context) { + l = &local{ + cs: f.ClientSet, + ns: f.Namespace, + options: dInfo.PerformanceTestOptions, + } + l.config = driver.PrepareTest(ctx, f) + + sc := driver.(storageframework.DynamicPVTestDriver).GetDynamicProvisionStorageClass(ctx, l.config, pattern.FsType) + ginkgo.By(fmt.Sprintf("Creating Storage Class %v", sc)) + if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { + e2eskipper.Skipf("WaitForFirstConsumer binding mode currently not supported for this test") + } + ginkgo.By(fmt.Sprintf("Creating Storage Class %s", sc.Name)) + sc, err := l.cs.StorageV1().StorageClasses().Create(ctx, sc, metav1.CreateOptions{}) + framework.ExpectNoError(err) + l.scName = sc.Name + + // Stats for volume provisioning operation; we only need this because imported function newPVCWatch from volumeperf.go requires this as an argument + // (this test itself doesn't use these stats) + provisioningStats := &performanceStats{ + mutex: &sync.Mutex{}, + perObjectInterval: make(map[string]*interval), + operationMetrics: &storageframework.Metrics{}, + } + // Create a controller to watch on PVCs + // When all PVCs provisioned by this test are in the Bound state, the controller + // sends a signal to the channel + controller := newPVCWatch(ctx, f, l.options.ProvisioningOptions.Count, provisioningStats) + l.stopCh = make(chan struct{}) + go controller.Run(l.stopCh) + waitForProvisionCh = make(chan []*v1.PersistentVolumeClaim) + + ginkgo.By(fmt.Sprintf("Creating %d PVCs of size %s", l.options.ProvisioningOptions.Count, l.options.ProvisioningOptions.VolumeSize)) + for i := 0; i < l.options.ProvisioningOptions.Count; i++ { + pvc := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{ + ClaimSize: l.options.ProvisioningOptions.VolumeSize, + StorageClassName: &sc.Name, + }, l.ns.Name) + pvc, err = l.cs.CoreV1().PersistentVolumeClaims(l.ns.Name).Create(ctx, pvc, metav1.CreateOptions{}) + framework.ExpectNoError(err) + // Store create time for each PVC + provisioningStats.mutex.Lock() + provisioningStats.perObjectInterval[pvc.Name] = &interval{ + create: pvc.CreationTimestamp.Time, + } + provisioningStats.mutex.Unlock() + // Create pods + podConfig := e2epod.Config{ + NS: l.ns.Name, + SeLinuxLabel: e2epv.SELinuxLabel, + } + pod, _ := e2epod.MakeSecPod(&podConfig) + _, err = l.cs.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + framework.Failf("Failed to create pod [%+v]. Error: %v", pod, err) + } + framework.ExpectNoError(err) + + l.pods = append(l.pods, pod) + } + + ginkgo.By("Waiting for all PVCs to be Bound...") + + select { + case l.pvcs = <-waitForProvisionCh: + framework.Logf("All PVCs in Bound state") + case <-time.After(pvcDeletionTestTimeout): + ginkgo.Fail(fmt.Sprintf("expected all PVCs to be in Bound state within %v", pvcDeletionTestTimeout.Round(time.Second))) + } + }) + +}