Revert "Improve PVC protection controller's scalability by batch-processing PVCs by namespace & caching live pod list results"

This commit is contained in:
pwschuurman 2024-08-15 12:28:09 -07:00 committed by Peter Schuurman
parent f26cf38a50
commit dbcbdbf5fb
4 changed files with 54 additions and 513 deletions

View File

@ -19,7 +19,6 @@ package pvcprotection
import (
"context"
"fmt"
"sync"
"time"
v1 "k8s.io/api/core/v1"
@ -42,65 +41,6 @@ import (
// Controller is controller that removes PVCProtectionFinalizer
// from PVCs that are used by no pods.
type LazyLivePodList struct {
cache []v1.Pod
controller *Controller
}
func (ll *LazyLivePodList) getCache() []v1.Pod {
return ll.cache
}
func (ll *LazyLivePodList) setCache(pods []v1.Pod) {
ll.cache = pods
}
type pvcData struct {
pvcKey string
pvcName string
}
type pvcProcessingStore struct {
namespaceToPVCsMap map[string][]pvcData
namespaceQueue workqueue.TypedInterface[string]
mu sync.Mutex
}
func NewPVCProcessingStore() *pvcProcessingStore {
return &pvcProcessingStore{
namespaceToPVCsMap: make(map[string][]pvcData),
namespaceQueue: workqueue.NewTyped[string](),
}
}
func (m *pvcProcessingStore) addOrUpdate(namespace string, pvcKey, pvcName string) {
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.namespaceToPVCsMap[namespace]; !exists {
m.namespaceToPVCsMap[namespace] = make([]pvcData, 0)
m.namespaceQueue.Add(namespace)
}
m.namespaceToPVCsMap[namespace] = append(m.namespaceToPVCsMap[namespace], pvcData{pvcKey: pvcKey, pvcName: pvcName})
}
// Returns a list of pvcs and the associated namespace to be processed downstream
func (m *pvcProcessingStore) flushNextPVCsByNamespace() ([]pvcData, string) {
nextNamespace, quit := m.namespaceQueue.Get()
if quit {
return nil, nextNamespace
}
m.mu.Lock()
defer m.mu.Unlock()
pvcs := m.namespaceToPVCsMap[nextNamespace]
delete(m.namespaceToPVCsMap, nextNamespace)
m.namespaceQueue.Done(nextNamespace)
return pvcs, nextNamespace
}
type Controller struct {
client clientset.Interface
@ -111,8 +51,7 @@ type Controller struct {
podListerSynced cache.InformerSynced
podIndexer cache.Indexer
queue workqueue.TypedRateLimitingInterface[string]
pvcProcessingStore *pvcProcessingStore
queue workqueue.TypedRateLimitingInterface[string]
}
// NewPVCProtectionController returns a new instance of PVCProtectionController.
@ -123,7 +62,6 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "pvcprotection"},
),
pvcProcessingStore: NewPVCProcessingStore(),
}
e.pvcLister = pvcInformer.Lister()
@ -162,7 +100,6 @@ func NewPVCProtectionController(logger klog.Logger, pvcInformer coreinformers.Pe
func (c *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
defer c.pvcProcessingStore.namespaceQueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting PVC protection controller")
@ -172,64 +109,45 @@ func (c *Controller) Run(ctx context.Context, workers int) {
return
}
go wait.UntilWithContext(ctx, c.runMainWorker, time.Second)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runProcessNamespaceWorker, time.Second)
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
<-ctx.Done()
}
// Main worker batch-pulls PVC items off informer's work queue and populates namespace queue and namespace-PVCs map
func (c *Controller) runMainWorker(ctx context.Context) {
for c.processNextWorkItem() {
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
// Consumer worker pulls items off namespace queue and processes associated PVCs
func (c *Controller) runProcessNamespaceWorker(ctx context.Context) {
for c.processPVCsByNamespace(ctx) {
}
}
func (c *Controller) processNextWorkItem() bool {
queueLength := c.queue.Len()
for i := 0; i < queueLength; i++ {
pvcKey, quit := c.queue.Get()
if quit {
return false
}
pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error parsing PVC key %q: %w", pvcKey, err))
}
c.pvcProcessingStore.addOrUpdate(pvcNamespace, pvcKey, pvcName)
}
return !c.queue.ShuttingDown()
}
func (c *Controller) processPVCsByNamespace(ctx context.Context) bool {
pvcList, namespace := c.pvcProcessingStore.flushNextPVCsByNamespace()
if pvcList == nil {
// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit.
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
pvcKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(pvcKey)
lazyLivePodList := &LazyLivePodList{controller: c}
for _, item := range pvcList {
pvcKey, pvcName := item.pvcKey, item.pvcName
err := c.processPVC(ctx, namespace, pvcName, lazyLivePodList)
if err == nil {
c.queue.Forget(pvcKey)
} else {
c.queue.AddRateLimited(pvcKey)
utilruntime.HandleError(fmt.Errorf("PVC %v in namespace %v failed with: %w", pvcName, namespace, err))
}
c.queue.Done(pvcKey)
pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error parsing PVC key %q: %w", pvcKey, err))
return true
}
err = c.processPVC(ctx, pvcNamespace, pvcName)
if err == nil {
c.queue.Forget(pvcKey)
return true
}
utilruntime.HandleError(fmt.Errorf("PVC %v failed with : %w", pvcKey, err))
c.queue.AddRateLimited(pvcKey)
return true
}
func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName string, lazyLivePodList *LazyLivePodList) error {
func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName string) error {
logger := klog.FromContext(ctx)
logger.V(4).Info("Processing PVC", "PVC", klog.KRef(pvcNamespace, pvcName))
startTime := time.Now()
@ -249,7 +167,7 @@ func (c *Controller) processPVC(ctx context.Context, pvcNamespace, pvcName strin
if protectionutil.IsDeletionCandidate(pvc, volumeutil.PVCProtectionFinalizer) {
// PVC should be deleted. Check if it's used and remove finalizer if
// it's not.
isUsed, err := c.isBeingUsed(ctx, pvc, lazyLivePodList)
isUsed, err := c.isBeingUsed(ctx, pvc)
if err != nil {
return err
}
@ -291,11 +209,11 @@ func (c *Controller) removeFinalizer(ctx context.Context, pvc *v1.PersistentVolu
logger.Error(err, "Error removing protection finalizer from PVC", "PVC", klog.KObj(pvc))
return err
}
logger.Info("Removed protection finalizer from PVC", "PVC", klog.KObj(pvc))
logger.V(3).Info("Removed protection finalizer from PVC", "PVC", klog.KObj(pvc))
return nil
}
func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeClaim, lazyLivePodList *LazyLivePodList) (bool, error) {
func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) {
// Look for a Pod using pvc in the Informer's cache. If one is found the
// correct decision to keep pvc is taken without doing an expensive live
// list.
@ -311,9 +229,7 @@ func (c *Controller) isBeingUsed(ctx context.Context, pvc *v1.PersistentVolumeCl
// mean such a Pod doesn't exist: it might just not be in the cache yet. To
// be 100% confident that it is safe to delete pvc make sure no Pod is using
// it among those returned by a live list.
// Use lazy live pod list instead of directly calling API server
return c.askAPIServer(ctx, pvc, lazyLivePodList)
return c.askAPIServer(ctx, pvc)
}
func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeClaim) (bool, error) {
@ -342,24 +258,16 @@ func (c *Controller) askInformer(logger klog.Logger, pvc *v1.PersistentVolumeCla
return false, nil
}
func (c *Controller) askAPIServer(ctx context.Context, pvc *v1.PersistentVolumeClaim, lazyLivePodList *LazyLivePodList) (bool, error) {
func (c *Controller) askAPIServer(ctx context.Context, pvc *v1.PersistentVolumeClaim) (bool, error) {
logger := klog.FromContext(ctx)
logger.V(4).Info("Looking for Pods using PVC with a live list", "PVC", klog.KObj(pvc))
if lazyLivePodList.getCache() == nil {
podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return false, fmt.Errorf("live list of pods failed: %s", err.Error())
}
if podsList.Items == nil {
lazyLivePodList.setCache(make([]v1.Pod, 0))
} else {
lazyLivePodList.setCache(podsList.Items)
}
podsList, err := c.client.CoreV1().Pods(pvc.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return false, fmt.Errorf("live list of pods failed: %s", err.Error())
}
for _, pod := range lazyLivePodList.getCache() {
for _, pod := range podsList.Items {
if c.podUsesPVC(logger, &pod, pvc) {
return true, nil
}

View File

@ -47,7 +47,6 @@ type reaction struct {
const (
defaultNS = "default"
namespace2 = "namespace-2"
defaultPVCName = "pvc1"
defaultPodName = "pod1"
defaultNodeName = "node1"
@ -70,22 +69,6 @@ func pod() *v1.Pod {
}
}
func podWithConfig(name string, namespace string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
UID: defaultUID,
},
Spec: v1.PodSpec{
NodeName: defaultNodeName,
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
}
}
func unscheduled(pod *v1.Pod) *v1.Pod {
pod.Spec.NodeName = ""
return pod
@ -134,15 +117,6 @@ func pvc() *v1.PersistentVolumeClaim {
}
}
func pvcWithConfig(name string, namespace string) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
}
func withProtectionFinalizer(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
pvc.Finalizers = append(pvc.Finalizers, volumeutil.PVCProtectionFinalizer)
return pvc
@ -172,23 +146,6 @@ func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionF
}
}
func generatePodListErrorFunc(t *testing.T, failures int) clienttesting.ReactionFunc {
i := 0
return func(action clienttesting.Action) (bool, runtime.Object, error) {
i++
if i <= failures {
// Pod List fails
list, ok := action.(clienttesting.ListAction)
if !ok {
t.Fatalf("Reactor got non-list action: %+v", action)
}
return true, nil, apierrors.NewForbidden(list.GetResource().GroupResource(), "mock pod", errors.New("Mock error"))
}
// List succeeds
return false, nil, nil
}
}
func TestPVCProtectionController(t *testing.T) {
pvcGVR := schema.GroupVersionResource{
Group: v1.GroupName,
@ -218,7 +175,7 @@ func TestPVCProtectionController(t *testing.T) {
reactors []reaction
// PVC event to simulate. This PVC will be automatically added to
// initialObjects.
updatedPVCs []*v1.PersistentVolumeClaim
updatedPVC *v1.PersistentVolumeClaim
// Pod event to simulate. This Pod will be automatically added to
// initialObjects.
updatedPod *v1.Pod
@ -233,21 +190,20 @@ func TestPVCProtectionController(t *testing.T) {
// PVC events
//
{
name: "PVC without finalizer -> finalizer is added",
updatedPVCs: []*v1.PersistentVolumeClaim{pvc(), pvcWithConfig("pvc2", "namespace-2")},
name: "PVC without finalizer -> finalizer is added",
updatedPVC: pvc(),
expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(pvcGVR, defaultNS, withProtectionFinalizer(pvc())),
clienttesting.NewUpdateAction(pvcGVR, "namespace-2", withProtectionFinalizer(pvcWithConfig("pvc2", "namespace-2"))),
},
},
{
name: "PVC with finalizer -> no action",
updatedPVCs: []*v1.PersistentVolumeClaim{withProtectionFinalizer(pvc()), withProtectionFinalizer(pvcWithConfig("pvc2", "namespace-2"))},
updatedPVC: withProtectionFinalizer(pvc()),
expectedActions: []clienttesting.Action{},
},
{
name: "saving PVC finalizer fails -> controller retries",
updatedPVCs: []*v1.PersistentVolumeClaim{pvc()},
name: "saving PVC finalizer fails -> controller retries",
updatedPVC: pvc(),
reactors: []reaction{
{
verb: "update",
@ -265,29 +221,16 @@ func TestPVCProtectionController(t *testing.T) {
},
},
{
name: "deleted PVC with finalizers across different namespaces -> finalizer is removed",
updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())),
deleted(withProtectionFinalizer(pvcWithConfig("pvc2", "namespace-2")))},
name: "deleted PVC with finalizer -> finalizer is removed",
updatedPVC: deleted(withProtectionFinalizer(pvc())),
expectedActions: []clienttesting.Action{
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
clienttesting.NewListAction(podGVR, podGVK, "namespace-2", metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, "namespace-2", deleted(pvcWithConfig("pvc2", "namespace-2"))),
},
},
{
name: "multiple PVCs with finalizer for the same namespace; no alive pods -> finalizer is removed and only one API pod list is made",
updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())),
deleted(withProtectionFinalizer(pvcWithConfig("pvc2", defaultNS)))},
expectedActions: []clienttesting.Action{
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc2", defaultNS))),
},
},
{
name: "finalizer removal fails -> controller retries",
updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))},
name: "finalizer removal fails -> controller retries",
updatedPVC: deleted(withProtectionFinalizer(pvc())),
reactors: []reaction{
{
verb: "update",
@ -307,33 +250,12 @@ func TestPVCProtectionController(t *testing.T) {
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
},
},
{
name: "delete multiple PVCs of the same namespace; pod list fails for one PVC -> add failing PVC back to queue and continue to the next PVC",
updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())), deleted(withProtectionFinalizer(pvcWithConfig("pvc2", defaultNS)))},
reactors: []reaction{
{
verb: "list",
resource: "pods",
reactorfn: generatePodListErrorFunc(t, 1 /* update fails twice*/),
},
},
expectedActions: []clienttesting.Action{
// Fails
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
// Succeed with next pvc in queue
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc2", defaultNS))),
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
},
},
{
name: "deleted PVC with finalizer + pod with the PVC exists -> finalizer is not removed",
initialObjects: []runtime.Object{
withPVC(defaultPVCName, pod()),
},
updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))},
updatedPVC: deleted(withProtectionFinalizer(pvc())),
expectedActions: []clienttesting.Action{},
},
{
@ -341,34 +263,18 @@ func TestPVCProtectionController(t *testing.T) {
initialObjects: []runtime.Object{
withEmptyDir(withPVC("unrelatedPVC", pod())),
},
updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))},
updatedPVC: deleted(withProtectionFinalizer(pvc())),
expectedActions: []clienttesting.Action{
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
},
},
{
name: "deleted multiple PVCs with finalizer (same namespace) + pod with unrelated PVC and EmptyDir exists -> only one live pod list & finalizers are removed",
initialObjects: []runtime.Object{
withEmptyDir(withPVC("unrelatedPVC", pod())),
},
updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())),
deleted(withProtectionFinalizer(pvcWithConfig("pvc2", defaultNS))),
deleted(withProtectionFinalizer(pvcWithConfig("pvc3", defaultNS))),
},
expectedActions: []clienttesting.Action{
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvc())),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc2", defaultNS))),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc3", defaultNS))),
},
},
{
name: "deleted PVC with finalizer + pod with the PVC finished but is not deleted -> finalizer is not removed",
initialObjects: []runtime.Object{
withStatus(v1.PodFailed, withPVC(defaultPVCName, pod())),
},
updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))},
updatedPVC: deleted(withProtectionFinalizer(pvc())),
expectedActions: []clienttesting.Action{},
},
{
@ -377,26 +283,11 @@ func TestPVCProtectionController(t *testing.T) {
withPVC(defaultPVCName, pod()),
},
informersAreLate: true,
updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc()))},
updatedPVC: deleted(withProtectionFinalizer(pvc())),
expectedActions: []clienttesting.Action{
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
},
},
{
name: "mix of deleted PVCs with some used by a pod and some unused -> finalizer is removed only for unused PVCs",
initialObjects: []runtime.Object{
withPVC(defaultPVCName, pod()),
withPVC("pvc3", podWithConfig("pod2", "namespace-3")),
},
informersAreLate: true,
updatedPVCs: []*v1.PersistentVolumeClaim{deleted(withProtectionFinalizer(pvc())), deleted(withProtectionFinalizer(pvcWithConfig("pvc2", defaultNS))), deleted(withProtectionFinalizer(pvcWithConfig("pvc3", "namespace-3")))},
expectedActions: []clienttesting.Action{
clienttesting.NewListAction(podGVR, podGVK, defaultNS, metav1.ListOptions{}),
clienttesting.NewUpdateAction(pvcGVR, defaultNS, deleted(pvcWithConfig("pvc2", defaultNS))),
clienttesting.NewListAction(podGVR, podGVK, "namespace-3", metav1.ListOptions{}),
},
},
//
// Pod events
//
@ -485,11 +376,9 @@ func TestPVCProtectionController(t *testing.T) {
clientObjs []runtime.Object
informersObjs []runtime.Object
)
if test.updatedPVCs != nil {
for i := 0; i < len(test.updatedPVCs); i++ {
clientObjs = append(clientObjs, test.updatedPVCs[i])
informersObjs = append(informersObjs, test.updatedPVCs[i])
}
if test.updatedPVC != nil {
clientObjs = append(clientObjs, test.updatedPVC)
informersObjs = append(informersObjs, test.updatedPVC)
}
if test.updatedPod != nil {
clientObjs = append(clientObjs, test.updatedPod)
@ -499,6 +388,7 @@ func TestPVCProtectionController(t *testing.T) {
if !test.informersAreLate {
informersObjs = append(informersObjs, test.initialObjects...)
}
// Create client with initial data
client := fake.NewSimpleClientset(clientObjs...)
@ -533,10 +423,8 @@ func TestPVCProtectionController(t *testing.T) {
}
// Start the test by simulating an event
if test.updatedPVCs != nil {
for i := 0; i < len(test.updatedPVCs); i++ {
ctrl.pvcAddedUpdated(logger, test.updatedPVCs[i])
}
if test.updatedPVC != nil {
ctrl.pvcAddedUpdated(logger, test.updatedPVC)
}
switch {
case test.deletedPod != nil && test.updatedPod != nil && test.deletedPod.Namespace == test.updatedPod.Namespace && test.deletedPod.Name == test.updatedPod.Name:
@ -557,18 +445,12 @@ func TestPVCProtectionController(t *testing.T) {
}
if ctrl.queue.Len() > 0 {
logger.V(5).Info("Non-empty queue, processing one", "test", test.name, "queueLength", ctrl.queue.Len())
ctx := context.TODO()
ctrl.processNextWorkItem()
for ctrl.pvcProcessingStore.namespaceQueue.Len() != 0 {
ctrl.processPVCsByNamespace(ctx)
}
ctrl.processNextWorkItem(context.TODO())
}
if ctrl.queue.Len() > 0 {
// There is still some work in the queue, process it now
continue
}
currentActionCount := len(client.Actions())
if currentActionCount < len(test.expectedActions) {
// Do not log every wait, only when the action count changes.

View File

@ -81,7 +81,6 @@ var CSISuites = append(BaseSuites,
InitSnapshottableTestSuite,
InitSnapshottableStressTestSuite,
InitVolumePerformanceTestSuite,
InitPvcDeletionPerformanceTestSuite,
InitReadWriteOncePodTestSuite,
InitVolumeModifyTestSuite,
)

View File

@ -1,248 +0,0 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testsuites
import (
"context"
"fmt"
"sync"
"time"
"github.com/onsi/ginkgo/v2"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
admissionapi "k8s.io/pod-security-admission/api"
)
type pvcDeletionPerformanceTestSuite struct {
tsInfo storageframework.TestSuiteInfo
}
var _ storageframework.TestSuite = &pvcDeletionPerformanceTestSuite{}
const pvcDeletionTestTimeout = 30 * time.Minute
// InitPvcDeletionPerformanceTestSuite returns pvcDeletionPerformanceTestSuite that implements TestSuite interface
// This test suite brings up a number of pods and PVCS (configured upstream), deletes the pods, and then deletes the PVCs.
// The main goal is to record the duration for the PVC/PV deletion process for each run, and so the test doesn't set explicit expectations to match against.
func InitPvcDeletionPerformanceTestSuite() storageframework.TestSuite {
return &pvcDeletionPerformanceTestSuite{
tsInfo: storageframework.TestSuiteInfo{
Name: "pvc-deletion-performance",
TestPatterns: []storageframework.TestPattern{
storageframework.BlockVolModeDynamicPV,
},
},
}
}
func (t *pvcDeletionPerformanceTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
return t.tsInfo
}
func (t *pvcDeletionPerformanceTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
}
func (t *pvcDeletionPerformanceTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
type local struct {
config *storageframework.PerTestConfig
cs clientset.Interface
ns *v1.Namespace
scName string
pvcs []*v1.PersistentVolumeClaim
options *storageframework.PerformanceTestOptions
stopCh chan struct{}
pods []*v1.Pod
}
var (
dInfo *storageframework.DriverInfo
l *local
)
ginkgo.BeforeEach(func() {
// Check preconditions
dDriver := driver.(storageframework.DynamicPVTestDriver)
if dDriver == nil {
e2eskipper.Skipf("Test driver does not support dynamically created volumes")
}
dInfo = dDriver.GetDriverInfo()
if dInfo == nil {
e2eskipper.Skipf("Failed to get Driver info -- skipping")
}
if dInfo.PerformanceTestOptions == nil || dInfo.PerformanceTestOptions.ProvisioningOptions == nil {
e2eskipper.Skipf("Driver %s doesn't specify performance test options -- skipping", dInfo.Name)
}
})
// Set high QPS for the framework to avoid client-side throttling from the test itself,
// which can interfere with measuring deletion time
frameworkOptions := framework.Options{
ClientQPS: 500,
ClientBurst: 1000,
}
f := framework.NewFramework("pvc-deletion-performance", frameworkOptions, nil)
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
ginkgo.AfterEach(func(ctx context.Context) {
if l != nil {
if l.stopCh != nil {
ginkgo.By("Closing informer channel")
close(l.stopCh)
}
deletingStats := &performanceStats{
mutex: &sync.Mutex{},
perObjectInterval: make(map[string]*interval),
operationMetrics: &storageframework.Metrics{},
}
var (
errs []error
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(len(l.pods))
for _, pod := range l.pods {
go func(pod *v1.Pod) {
defer ginkgo.GinkgoRecover()
defer wg.Done()
framework.Logf("Deleting pod %v", pod.Name)
err := e2epod.DeletePodWithWait(ctx, l.cs, pod)
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}(pod)
}
wg.Wait()
ginkgo.By("Deleting all PVCs")
startTime := time.Now()
wg.Add(len(l.pvcs))
for _, pvc := range l.pvcs {
go func(pvc *v1.PersistentVolumeClaim) { // Start a goroutine for each PVC
defer wg.Done() // Decrement the counter when the goroutine finishes
startDeletingPvcTime := time.Now()
framework.Logf("Start deleting PVC %v", pvc.GetName())
deletingStats.mutex.Lock()
deletingStats.perObjectInterval[pvc.Name] = &interval{
create: startDeletingPvcTime,
}
deletingStats.mutex.Unlock()
err := e2epv.DeletePersistentVolumeClaim(ctx, l.cs, pvc.Name, pvc.Namespace)
framework.ExpectNoError(err)
startDeletingPVTime := time.Now()
err = e2epv.WaitForPersistentVolumeDeleted(ctx, l.cs, pvc.Spec.VolumeName, 1*time.Second, 100*time.Minute)
framework.Logf("Deleted PV %v, PVC %v in %v", pvc.Spec.VolumeName, pvc.GetName(), time.Since(startDeletingPVTime))
framework.ExpectNoError(err)
}(pvc)
}
wg.Wait()
endTime := time.Now() // Capture overall end time
totalDuration := endTime.Sub(startTime)
framework.Logf("Deleted all PVC/PVs in %v", totalDuration) // Log total deletion time
ginkgo.By(fmt.Sprintf("Deleting Storage Class %s", l.scName))
err := l.cs.StorageV1().StorageClasses().Delete(ctx, l.scName, metav1.DeleteOptions{})
framework.ExpectNoError(err)
} else {
ginkgo.By("Local l setup is nil")
}
})
f.It("should delete volumes at scale within performance constraints", f.WithSlow(), f.WithSerial(), func(ctx context.Context) {
l = &local{
cs: f.ClientSet,
ns: f.Namespace,
options: dInfo.PerformanceTestOptions,
}
l.config = driver.PrepareTest(ctx, f)
sc := driver.(storageframework.DynamicPVTestDriver).GetDynamicProvisionStorageClass(ctx, l.config, pattern.FsType)
ginkgo.By(fmt.Sprintf("Creating Storage Class %v", sc))
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
e2eskipper.Skipf("WaitForFirstConsumer binding mode currently not supported for this test")
}
ginkgo.By(fmt.Sprintf("Creating Storage Class %s", sc.Name))
sc, err := l.cs.StorageV1().StorageClasses().Create(ctx, sc, metav1.CreateOptions{})
framework.ExpectNoError(err)
l.scName = sc.Name
// Stats for volume provisioning operation; we only need this because imported function newPVCWatch from volumeperf.go requires this as an argument
// (this test itself doesn't use these stats)
provisioningStats := &performanceStats{
mutex: &sync.Mutex{},
perObjectInterval: make(map[string]*interval),
operationMetrics: &storageframework.Metrics{},
}
// Create a controller to watch on PVCs
// When all PVCs provisioned by this test are in the Bound state, the controller
// sends a signal to the channel
controller := newPVCWatch(ctx, f, l.options.ProvisioningOptions.Count, provisioningStats)
l.stopCh = make(chan struct{})
go controller.Run(l.stopCh)
waitForProvisionCh = make(chan []*v1.PersistentVolumeClaim)
ginkgo.By(fmt.Sprintf("Creating %d PVCs of size %s", l.options.ProvisioningOptions.Count, l.options.ProvisioningOptions.VolumeSize))
for i := 0; i < l.options.ProvisioningOptions.Count; i++ {
pvc := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
ClaimSize: l.options.ProvisioningOptions.VolumeSize,
StorageClassName: &sc.Name,
}, l.ns.Name)
pvc, err = l.cs.CoreV1().PersistentVolumeClaims(l.ns.Name).Create(ctx, pvc, metav1.CreateOptions{})
framework.ExpectNoError(err)
// Store create time for each PVC
provisioningStats.mutex.Lock()
provisioningStats.perObjectInterval[pvc.Name] = &interval{
create: pvc.CreationTimestamp.Time,
}
provisioningStats.mutex.Unlock()
// Create pods
podConfig := e2epod.Config{
NS: l.ns.Name,
SeLinuxLabel: e2epv.SELinuxLabel,
}
pod, _ := e2epod.MakeSecPod(&podConfig)
_, err = l.cs.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
framework.Failf("Failed to create pod [%+v]. Error: %v", pod, err)
}
framework.ExpectNoError(err)
l.pods = append(l.pods, pod)
}
ginkgo.By("Waiting for all PVCs to be Bound...")
select {
case l.pvcs = <-waitForProvisionCh:
framework.Logf("All PVCs in Bound state")
case <-time.After(pvcDeletionTestTimeout):
ginkgo.Fail(fmt.Sprintf("expected all PVCs to be in Bound state within %v", pvcDeletionTestTimeout.Round(time.Second)))
}
})
}