Merge pull request #123549 from carlory/kep-3751-finalizer

A new controller adds/removes finalizer to VAC for protection
This commit is contained in:
Kubernetes Prow Robot 2024-11-05 21:45:30 +00:00 committed by GitHub
commit 08391b3d27
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 1052 additions and 0 deletions

View File

@ -567,6 +567,7 @@ func NewControllerDescriptors() map[string]*ControllerDescriptor {
register(newClusterRoleAggregrationControllerDescriptor())
register(newPersistentVolumeClaimProtectionControllerDescriptor())
register(newPersistentVolumeProtectionControllerDescriptor())
register(newVolumeAttributesClassProtectionControllerDescriptor())
register(newTTLAfterFinishedControllerDescriptor())
register(newRootCACertificatePublisherControllerDescriptor())
register(newEphemeralVolumeControllerDescriptor())

View File

@ -86,6 +86,7 @@ func TestControllerNamesDeclaration(t *testing.T) {
names.ClusterRoleAggregationController,
names.PersistentVolumeClaimProtectionController,
names.PersistentVolumeProtectionController,
names.VolumeAttributesClassProtectionController,
names.TTLAfterFinishedController,
names.RootCACertificatePublisherController,
names.EphemeralVolumeController,

View File

@ -64,6 +64,7 @@ import (
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/pkg/controller/volume/pvcprotection"
"k8s.io/kubernetes/pkg/controller/volume/pvprotection"
"k8s.io/kubernetes/pkg/controller/volume/vacprotection"
"k8s.io/kubernetes/pkg/features"
quotainstall "k8s.io/kubernetes/pkg/quota/v1/install"
"k8s.io/kubernetes/pkg/volume/csimigration"
@ -684,6 +685,31 @@ func startPersistentVolumeProtectionController(ctx context.Context, controllerCo
return nil, true, nil
}
func newVolumeAttributesClassProtectionControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.VolumeAttributesClassProtectionController,
initFunc: startVolumeAttributesClassProtectionController,
requiredFeatureGates: []featuregate.Feature{
features.VolumeAttributesClass,
},
}
}
func startVolumeAttributesClassProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
vacProtectionController, err := vacprotection.NewVACProtectionController(
klog.FromContext(ctx),
controllerContext.ClientBuilder.ClientOrDie("volumeattributesclass-protection-controller"),
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
controllerContext.InformerFactory.Storage().V1beta1().VolumeAttributesClasses(),
)
if err != nil {
return nil, true, fmt.Errorf("failed to start the vac protection controller: %w", err)
}
go vacProtectionController.Run(ctx, 1)
return nil, true, nil
}
func newTTLAfterFinishedControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.TTLAfterFinishedController,

View File

@ -82,6 +82,7 @@ const (
ResourceClaimController = "resourceclaim-controller"
LegacyServiceAccountTokenCleanerController = "legacy-serviceaccount-token-cleaner-controller"
ValidatingAdmissionPolicyStatusController = "validatingadmissionpolicy-status-controller"
VolumeAttributesClassProtectionController = "volumeattributesclass-protection-controller"
ServiceCIDRController = "service-cidr-controller"
StorageVersionMigratorController = "storage-version-migrator-controller"
)

View File

@ -0,0 +1,216 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package protectionutil
import (
v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
// PodWrapper wraps a Pod inside.
type PodWrapper struct{ v1.Pod }
// MakePod creates a Pod wrapper.
func MakePod() *PodWrapper {
return &PodWrapper{v1.Pod{}}
}
// Obj returns the inner Pod.
func (p *PodWrapper) Obj() *v1.Pod {
return &p.Pod
}
// Name sets `s` as the name of the inner pod.
func (p *PodWrapper) Name(s string) *PodWrapper {
p.SetName(s)
return p
}
// UID sets `s` as the UID of the inner pod.
func (p *PodWrapper) UID(s string) *PodWrapper {
p.SetUID(types.UID(s))
return p
}
// SchedulerName sets `s` as the scheduler name of the inner pod.
func (p *PodWrapper) SchedulerName(s string) *PodWrapper {
p.Spec.SchedulerName = s
return p
}
// Namespace sets `s` as the namespace of the inner pod.
func (p *PodWrapper) Namespace(s string) *PodWrapper {
p.SetNamespace(s)
return p
}
// Terminating sets the inner pod's deletionTimestamp to current timestamp.
func (p *PodWrapper) Terminating() *PodWrapper {
now := metav1.Now()
p.DeletionTimestamp = &now
return p
}
// PVC creates a Volume with a PVC and injects into the inner pod.
func (p *PodWrapper) PVC(name string) *PodWrapper {
p.Spec.Volumes = append(p.Spec.Volumes, v1.Volume{
Name: name,
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: name},
},
})
return p
}
// Annotation sets a {k,v} pair to the inner pod annotation.
func (p *PodWrapper) Annotation(key, value string) *PodWrapper {
metav1.SetMetaDataAnnotation(&p.ObjectMeta, key, value)
return p
}
// Annotations sets all {k,v} pair provided by `annotations` to the inner pod annotations.
func (p *PodWrapper) Annotations(annotations map[string]string) *PodWrapper {
for k, v := range annotations {
p.Annotation(k, v)
}
return p
}
// PersistentVolumeClaimWrapper wraps a PersistentVolumeClaim inside.
type PersistentVolumeClaimWrapper struct{ v1.PersistentVolumeClaim }
// MakePersistentVolumeClaim creates a PersistentVolumeClaim wrapper.
func MakePersistentVolumeClaim() *PersistentVolumeClaimWrapper {
return &PersistentVolumeClaimWrapper{}
}
// Obj returns the inner PersistentVolumeClaim.
func (p *PersistentVolumeClaimWrapper) Obj() *v1.PersistentVolumeClaim {
return &p.PersistentVolumeClaim
}
// Name sets `s` as the name of the inner PersistentVolumeClaim.
func (p *PersistentVolumeClaimWrapper) Name(s string) *PersistentVolumeClaimWrapper {
p.SetName(s)
return p
}
// Namespace sets `s` as the namespace of the inner PersistentVolumeClaim.
func (p *PersistentVolumeClaimWrapper) Namespace(s string) *PersistentVolumeClaimWrapper {
p.SetNamespace(s)
return p
}
// Annotation sets a {k,v} pair to the inner PersistentVolumeClaim.
func (p *PersistentVolumeClaimWrapper) Annotation(key, value string) *PersistentVolumeClaimWrapper {
metav1.SetMetaDataAnnotation(&p.ObjectMeta, key, value)
return p
}
// VolumeName sets `name` as the volume name of the inner
// PersistentVolumeClaim.
func (p *PersistentVolumeClaimWrapper) VolumeName(name string) *PersistentVolumeClaimWrapper {
p.PersistentVolumeClaim.Spec.VolumeName = name
return p
}
func (p *PersistentVolumeClaimWrapper) Finalizer(s string) *PersistentVolumeClaimWrapper {
p.Finalizers = append(p.Finalizers, s)
return p
}
// VolumeAttributesClassName sets `s` as the VolumeAttributesClassName of the inner PersistentVolumeClaim.
func (p *PersistentVolumeClaimWrapper) VolumeAttributesClassName(s string) *PersistentVolumeClaimWrapper {
p.Spec.VolumeAttributesClassName = &s
return p
}
// CurrentVolumeAttributesClassName sets `s` as the CurrentVolumeAttributesClassName of the inner PersistentVolumeClaim.
func (p *PersistentVolumeClaimWrapper) CurrentVolumeAttributesClassName(s string) *PersistentVolumeClaimWrapper {
p.Status.CurrentVolumeAttributesClassName = &s
return p
}
// TargetVolumeAttributesClassName sets `s` as the TargetVolumeAttributesClassName of the inner PersistentVolumeClaim.
// It also sets the status to Pending.
func (p *PersistentVolumeClaimWrapper) TargetVolumeAttributesClassName(s string) *PersistentVolumeClaimWrapper {
p.Status.ModifyVolumeStatus = &v1.ModifyVolumeStatus{
TargetVolumeAttributesClassName: s,
Status: v1.PersistentVolumeClaimModifyVolumePending,
}
return p
}
// PersistentVolumeWrapper wraps a PersistentVolume inside.
type PersistentVolumeWrapper struct{ v1.PersistentVolume }
// MakePersistentVolume creates a PersistentVolume wrapper.
func MakePersistentVolume() *PersistentVolumeWrapper {
return &PersistentVolumeWrapper{}
}
// Obj returns the inner PersistentVolume.
func (p *PersistentVolumeWrapper) Obj() *v1.PersistentVolume {
return &p.PersistentVolume
}
// Name sets `s` as the name of the inner PersistentVolume.
func (p *PersistentVolumeWrapper) Name(s string) *PersistentVolumeWrapper {
p.SetName(s)
return p
}
// VolumeAttributesClassName sets `s` as the VolumeAttributesClassName of the inner PersistentVolume.
func (p *PersistentVolumeWrapper) VolumeAttributesClassName(s string) *PersistentVolumeWrapper {
p.Spec.VolumeAttributesClassName = &s
return p
}
// VolumeAttributesClassWrapper wraps a VolumeAttributesClass inside.
type VolumeAttributesClassWrapper struct {
storagev1beta1.VolumeAttributesClass
}
// MakeVolumeAttributesClass creates a VolumeAttributesClass wrapper.
func MakeVolumeAttributesClass() *VolumeAttributesClassWrapper {
return &VolumeAttributesClassWrapper{}
}
// Obj returns the inner VolumeAttributesClass.
func (v *VolumeAttributesClassWrapper) Obj() *storagev1beta1.VolumeAttributesClass {
return &v.VolumeAttributesClass
}
// Name sets `s` as the name of the inner VolumeAttributesClass.
func (v *VolumeAttributesClassWrapper) Name(s string) *VolumeAttributesClassWrapper {
v.SetName(s)
return v
}
// Terminating sets the inner VolumeAttributesClass' deletionTimestamp to non-nil.
func (v *VolumeAttributesClassWrapper) Terminating() *VolumeAttributesClassWrapper {
v.DeletionTimestamp = &metav1.Time{}
return v
}
// Finalizer appends `s` to the finalizers of the inner VolumeAttributesClass.
func (v *VolumeAttributesClassWrapper) Finalizer(s string) *VolumeAttributesClassWrapper {
v.Finalizers = append(v.Finalizers, s)
return v
}

View File

@ -0,0 +1,437 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vacprotection
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1beta1"
clientset "k8s.io/client-go/kubernetes"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/volume/protectionutil"
"k8s.io/kubernetes/pkg/util/slice"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/utils/ptr"
)
const (
vacNameKeyIndex = "volumeAttributesClassName"
)
// Controller is controller that adds and removes VACProtectionFinalizer
// from VACs that are not used by any PV or PVC.
//
// This controller only use informers, so it may remove the finalizer too early.
//
// One scenario is:
// 1. There is a VolumeAttributesClass that is not used by any PVC. This
// VolumeAttributesClass is synced to all informers (external-provisioner,
// external-resizer, KCM)
//
// 2. At the same time:
//
// * User creates a PVC that uses this VolumeAttributesClass.
//
// * Another user deletes the VolumeAttributesClass.
//
// 3. VolumeAttributesClass deletion event with DeletionTimestamp reaches
// this controller. Because the PVC creation event has not yet
// reached KCM informers, the controller lets the VolumeAttributesClass
// to be deleted by removing the finalizer. PVC creation event reaches
// the external-provisioner, before VolumeAttributesClass update. The
// external-provisioner will try to provision a new volume using the
// VolumeAttributesClass that will get deleted soon.
//
// * If the external-provisioner gets the VolumeAttributesClass before
// deletion in the informer, the provisioning will succeed.
//
// * Otherwise the external-prosivioner will fail the provisioning.
//
// Solving this scenario properly requires to Get/List requests to the API server,
// which will cause performance issue in larger cluster similar to the existing
// PVC protection controller - related issue https://github.com/kubernetes/kubernetes/issues/109282
type Controller struct {
client clientset.Interface
pvcSynced cache.InformerSynced
pvSynced cache.InformerSynced
vacLister storagelisters.VolumeAttributesClassLister
vacSynced cache.InformerSynced
getPVsAssignedToVAC func(vacName string) ([]*v1.PersistentVolume, error)
getPVCsAssignedToVAC func(vacName string) ([]*v1.PersistentVolumeClaim, error)
queue workqueue.TypedRateLimitingInterface[string]
}
// NewVACProtectionController returns a new *Controller.
func NewVACProtectionController(logger klog.Logger,
client clientset.Interface,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
vacInformer storageinformers.VolumeAttributesClassInformer) (*Controller, error) {
c := &Controller{
client: client,
pvcSynced: pvcInformer.Informer().HasSynced,
pvSynced: pvInformer.Informer().HasSynced,
vacLister: vacInformer.Lister(),
vacSynced: vacInformer.Informer().HasSynced,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "vacprotection",
},
),
}
_, _ = vacInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.vacAddedUpdated(logger, obj)
},
UpdateFunc: func(old, new interface{}) {
c.vacAddedUpdated(logger, new)
},
})
err := pvInformer.Informer().AddIndexers(cache.Indexers{vacNameKeyIndex: func(obj interface{}) ([]string, error) {
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
return []string{}, nil
}
return getPVReferencedVACNames(pv), nil
}})
if err != nil {
return nil, fmt.Errorf("failed to add index to PV informer: %w", err)
}
pvIndexer := pvInformer.Informer().GetIndexer()
c.getPVsAssignedToVAC = func(vacName string) ([]*v1.PersistentVolume, error) {
objs, err := pvIndexer.ByIndex(vacNameKeyIndex, vacName)
if err != nil {
return nil, err
}
pvcs := make([]*v1.PersistentVolume, 0, len(objs))
for _, obj := range objs {
pvc, ok := obj.(*v1.PersistentVolume)
if !ok {
continue
}
pvcs = append(pvcs, pvc)
}
return pvcs, nil
}
err = pvcInformer.Informer().AddIndexers(cache.Indexers{vacNameKeyIndex: func(obj interface{}) ([]string, error) {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return []string{}, nil
}
return getPVCReferencedVACNames(pvc), nil
}})
if err != nil {
return nil, fmt.Errorf("failed to add index to PVC informer: %w", err)
}
pvcIndexer := pvcInformer.Informer().GetIndexer()
c.getPVCsAssignedToVAC = func(vacName string) ([]*v1.PersistentVolumeClaim, error) {
objs, err := pvcIndexer.ByIndex(vacNameKeyIndex, vacName)
if err != nil {
return nil, err
}
pvcs := make([]*v1.PersistentVolumeClaim, 0, len(objs))
for _, obj := range objs {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
continue
}
pvcs = append(pvcs, pvc)
}
return pvcs, nil
}
_, _ = pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
c.pvcUpdated(logger, old, new)
},
DeleteFunc: func(obj interface{}) {
c.pvcDeleted(logger, obj)
},
})
_, _ = pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
c.pvUpdated(logger, old, new)
},
DeleteFunc: func(obj interface{}) {
c.pvDeleted(logger, obj)
},
})
return c, nil
}
// Run runs the controller goroutines.
func (c *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting VAC protection controller")
defer logger.Info("Shutting down VAC protection controller")
if !cache.WaitForNamedCacheSync("VAC protection", ctx.Done(), c.pvSynced, c.pvcSynced, c.vacSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
<-ctx.Done()
}
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit.
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
vacKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(vacKey)
err := c.processVAC(ctx, vacKey)
if err == nil {
c.queue.Forget(vacKey)
return true
}
utilruntime.HandleError(fmt.Errorf("VAC %v failed with : %w", vacKey, err))
c.queue.AddRateLimited(vacKey)
return true
}
func (c *Controller) processVAC(ctx context.Context, vacName string) error {
logger := klog.FromContext(ctx)
logger.V(4).Info("Processing VAC", "VAC", klog.KRef("", vacName))
startTime := time.Now()
defer func() {
logger.V(4).Info("Finished processing VAC", "VAC", klog.KRef("", vacName), "cost", time.Since(startTime))
}()
vac, err := c.vacLister.Get(vacName)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(4).Info("VAC not found, ignoring", "VAC", klog.KRef("", vacName))
return nil
}
return err
}
if protectionutil.IsDeletionCandidate(vac, volumeutil.VACProtectionFinalizer) {
// VAC should be deleted. Check if it's used and remove finalizer if
// it's not.
isUsed := c.isBeingUsed(ctx, vac)
if !isUsed {
return c.removeFinalizer(ctx, vac)
}
logger.V(4).Info("Keeping VAC because it is being used", "PVC", klog.KRef("", vacName))
}
if protectionutil.NeedToAddFinalizer(vac, volumeutil.VACProtectionFinalizer) {
return c.addFinalizer(ctx, vac)
}
return nil
}
func (c *Controller) addFinalizer(ctx context.Context, vac *storagev1beta1.VolumeAttributesClass) error {
vacClone := vac.DeepCopy()
vacClone.ObjectMeta.Finalizers = append(vacClone.ObjectMeta.Finalizers, volumeutil.VACProtectionFinalizer)
_, err := c.client.StorageV1beta1().VolumeAttributesClasses().Update(ctx, vacClone, metav1.UpdateOptions{})
logger := klog.FromContext(ctx)
if err != nil {
logger.V(3).Info("Error adding protection finalizer to VAC", "VAC", klog.KObj(vac), "err", err)
return err
}
logger.V(3).Info("Added protection finalizer to VAC", "VAC", klog.KObj(vac))
return nil
}
func (c *Controller) removeFinalizer(ctx context.Context, vac *storagev1beta1.VolumeAttributesClass) error {
vacClone := vac.DeepCopy()
vacClone.ObjectMeta.Finalizers = slice.RemoveString(vacClone.ObjectMeta.Finalizers, volumeutil.VACProtectionFinalizer, nil)
_, err := c.client.StorageV1beta1().VolumeAttributesClasses().Update(ctx, vacClone, metav1.UpdateOptions{})
logger := klog.FromContext(ctx)
if err != nil {
logger.V(3).Info("Error removing protection finalizer from VAC", "VAC", klog.KObj(vac), "err", err)
return err
}
logger.V(3).Info("Removed protection finalizer from VAC", "VAC", klog.KObj(vac))
return nil
}
func (c *Controller) isBeingUsed(ctx context.Context, vac *storagev1beta1.VolumeAttributesClass) bool {
logger := klog.FromContext(ctx)
pvs, err := c.getPVsAssignedToVAC(vac.Name)
if err != nil {
logger.Error(err, "Error getting PVs assigned to VAC", "VAC", klog.KObj(vac))
return true
}
if len(pvs) > 0 {
return true
}
pvcs, err := c.getPVCsAssignedToVAC(vac.Name)
if err != nil {
logger.Error(err, "Error getting PVCs assigned to VAC", "VAC", klog.KObj(vac))
return true
}
if len(pvcs) > 0 {
return true
}
return false
}
// pvAddedUpdated reacts to vac added/updated events
func (c *Controller) vacAddedUpdated(logger klog.Logger, obj interface{}) {
vac, ok := obj.(*storagev1beta1.VolumeAttributesClass)
if !ok {
utilruntime.HandleError(fmt.Errorf("VAC informer returned non-VAC object: %#v", obj))
return
}
logger.V(4).Info("Got event on VAC", "VAC", klog.KObj(vac))
if protectionutil.NeedToAddFinalizer(vac, volumeutil.VACProtectionFinalizer) || protectionutil.IsDeletionCandidate(vac, volumeutil.VACProtectionFinalizer) {
c.queue.Add(vac.Name)
}
}
// pvcDeleted reacts to pvc deleted events
func (c *Controller) pvcDeleted(logger klog.Logger, obj interface{}) {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", obj))
return
}
logger.V(4).Info("Got event on PVC", "PVC", klog.KObj(pvc))
vacNames := getPVCReferencedVACNames(pvc)
for _, vacName := range vacNames {
c.queue.Add(vacName)
}
}
// pvcUpdated reacts to pvc updated events
func (c *Controller) pvcUpdated(logger klog.Logger, old, new interface{}) {
oldPVC, ok := old.(*v1.PersistentVolumeClaim)
if !ok {
utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", old))
return
}
newPVC, ok := new.(*v1.PersistentVolumeClaim)
if !ok {
utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", new))
return
}
logger.V(4).Info("Got event on PVC", "PVC", klog.KObj(newPVC))
vavNames := sets.New(getPVCReferencedVACNames(oldPVC)...).Delete(getPVCReferencedVACNames(newPVC)...).UnsortedList()
for _, vacName := range vavNames {
c.queue.Add(vacName)
}
}
// pvUpdated reacts to pv updated events
func (c *Controller) pvUpdated(logger klog.Logger, old, new interface{}) {
oldPV, ok := old.(*v1.PersistentVolume)
if !ok {
utilruntime.HandleError(fmt.Errorf("PV informer returned non-PV object: %#v", old))
return
}
newPV, ok := new.(*v1.PersistentVolume)
if !ok {
utilruntime.HandleError(fmt.Errorf("PV informer returned non-PV object: %#v", new))
return
}
logger.V(4).Info("Got event on PV", "PV", klog.KObj(newPV))
vavNames := sets.New(getPVReferencedVACNames(oldPV)...).Delete(getPVReferencedVACNames(newPV)...).UnsortedList()
for _, vacName := range vavNames {
c.queue.Add(vacName)
}
}
// pvDeleted reacts to pv deleted events
func (c *Controller) pvDeleted(logger klog.Logger, obj interface{}) {
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
utilruntime.HandleError(fmt.Errorf("PV informer returned non-PV object: %#v", obj))
return
}
logger.V(4).Info("Got event on PV", "PV", klog.KObj(pv))
vacNames := getPVReferencedVACNames(pv)
for _, vacName := range vacNames {
c.queue.Add(vacName)
}
}
// getPVCReferencedVACNames returns a list of VAC names that are referenced by the PVC.
func getPVCReferencedVACNames(pvc *v1.PersistentVolumeClaim) []string {
keys := sets.New[string]()
vacName := ptr.Deref(pvc.Spec.VolumeAttributesClassName, "")
if vacName != "" {
keys.Insert(vacName)
}
vacName = ptr.Deref(pvc.Status.CurrentVolumeAttributesClassName, "")
if vacName != "" {
keys.Insert(vacName)
}
status := pvc.Status.ModifyVolumeStatus
if status != nil && status.TargetVolumeAttributesClassName != "" {
keys.Insert(status.TargetVolumeAttributesClassName)
}
return keys.UnsortedList()
}
// getPVReferencedVACNames returns a list of VAC names that are referenced by the PV.
func getPVReferencedVACNames(pv *v1.PersistentVolume) []string {
result := []string{}
vacName := ptr.Deref(pv.Spec.VolumeAttributesClassName, "")
if vacName != "" {
result = append(result, vacName)
}
return result
}

View File

@ -0,0 +1,355 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vacprotection
import (
"context"
"errors"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/dump"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/protectionutil"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
var (
vacGVR = schema.GroupVersionResource{
Group: storagev1beta1.GroupName,
Version: "v1beta1",
Resource: "volumeattributesclasses",
}
vac1 = protectionutil.MakeVolumeAttributesClass().Name("vac1").Obj()
vac1WithFinalizer = protectionutil.MakeVolumeAttributesClass().Name("vac1").Finalizer(volumeutil.VACProtectionFinalizer).Obj()
vac1TerminatingWithFinalizer = protectionutil.MakeVolumeAttributesClass().Name("vac1").Finalizer(volumeutil.VACProtectionFinalizer).Terminating().Obj()
vac1Terminating = protectionutil.MakeVolumeAttributesClass().Name("vac1").Terminating().Obj()
pv1WithVAC1 = protectionutil.MakePersistentVolume().Name("pv1").VolumeAttributesClassName("vac1").Obj()
pv1WithVAC2 = protectionutil.MakePersistentVolume().Name("pv1").VolumeAttributesClassName("vac2").Obj()
pv2WithVAC1 = protectionutil.MakePersistentVolume().Name("pv2").VolumeAttributesClassName("vac1").Obj()
pvc1WithVAC1 = protectionutil.MakePersistentVolumeClaim().Name("pvc1").VolumeAttributesClassName("vac1").Obj()
pvc1WithVAC2 = protectionutil.MakePersistentVolumeClaim().Name("pvc1").VolumeAttributesClassName("vac2").Obj()
pvc1WithVAC2CurrentVAC1 = protectionutil.MakePersistentVolumeClaim().Name("pvc1").VolumeAttributesClassName("vac2").CurrentVolumeAttributesClassName("vac1").Obj()
pvc1WithVAC2TargetVAC1 = protectionutil.MakePersistentVolumeClaim().Name("pvc1").VolumeAttributesClassName("vac2").TargetVolumeAttributesClassName("vac1").Obj()
pvc2WithVAC1 = protectionutil.MakePersistentVolumeClaim().Name("pvc2").VolumeAttributesClassName("vac1").Obj()
)
type reaction struct {
verb string
resource string
reactorfn clienttesting.ReactionFunc
}
func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionFunc {
i := 0
return func(action clienttesting.Action) (bool, runtime.Object, error) {
i++
if i <= failures {
// Update fails
update, ok := action.(clienttesting.UpdateAction)
if !ok {
t.Fatalf("Reactor got non-update action: %+v", action)
}
acc, _ := meta.Accessor(update.GetObject())
return true, nil, apierrors.NewForbidden(update.GetResource().GroupResource(), acc.GetName(), errors.New("Mock error"))
}
// Update succeeds
return false, nil, nil
}
}
func TestVACProtectionController(t *testing.T) {
tests := []struct {
name string
// Object to insert into fake kubeclient before the test starts.
initialObjects []runtime.Object
// Optional client reactors.
reactors []reaction
// VAC event to simulate. This VAC will be automatically added to
// initialObjects.
updatedVAC *storagev1beta1.VolumeAttributesClass
// PV event to simulate. The updatedPV will be automatically added to
// initialObjects.
oldPV *v1.PersistentVolume
updatedPV *v1.PersistentVolume
// PVC event to simulate. The updatedPVC will be automatically added to
// initialObjects.
oldPVC *v1.PersistentVolumeClaim
updatedPVC *v1.PersistentVolumeClaim
// List of expected kubeclient actions that should happen during the
// test.
expectedActions []clienttesting.Action
}{
// VAC events
//
{
name: "VAC without finalizer -> finalizer is added",
updatedVAC: vac1,
expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(vacGVR, "", vac1WithFinalizer),
},
},
{
name: "VAC with finalizer -> no action",
updatedVAC: vac1WithFinalizer,
expectedActions: []clienttesting.Action{},
},
{
name: "saving VAC finalizer fails -> controller retries",
updatedVAC: vac1,
reactors: []reaction{
{
verb: "update",
resource: "volumeattributesclasses",
reactorfn: generateUpdateErrorFunc(t, 2 /* update fails twice*/),
},
},
expectedActions: []clienttesting.Action{
// This fails
clienttesting.NewUpdateAction(vacGVR, "", vac1WithFinalizer),
// This fails too
clienttesting.NewUpdateAction(vacGVR, "", vac1WithFinalizer),
// This succeeds
clienttesting.NewUpdateAction(vacGVR, "", vac1WithFinalizer),
},
},
{
name: "deleted VAC with finalizer -> finalizer is removed",
updatedVAC: vac1TerminatingWithFinalizer,
expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating),
},
},
{
name: "finalizer removal fails -> controller retries",
updatedVAC: vac1TerminatingWithFinalizer,
reactors: []reaction{
{
verb: "update",
resource: "volumeattributesclasses",
reactorfn: generateUpdateErrorFunc(t, 2 /* update fails twice*/),
},
},
expectedActions: []clienttesting.Action{
// Fails
clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating),
// Fails too
clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating),
// Succeeds
clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating),
},
},
{
name: "deleted VAC with finalizer but it's referenced by a PV -> finalizer is not removed",
initialObjects: []runtime.Object{pv1WithVAC1},
updatedVAC: vac1TerminatingWithFinalizer,
expectedActions: []clienttesting.Action{},
},
{
name: "deleted VAC with finalizer but it's referenced by a PVC -> finalizer is not removed",
initialObjects: []runtime.Object{pvc1WithVAC1},
updatedVAC: vac1TerminatingWithFinalizer,
expectedActions: []clienttesting.Action{},
},
// PV events
//
{
name: "pv changes vac and deleted VAC with finalizer -> finalizer is removed",
initialObjects: []runtime.Object{vac1TerminatingWithFinalizer},
oldPV: pv1WithVAC1,
updatedPV: pv1WithVAC2,
expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating),
},
},
{
name: "pv is deleted and deleted VAC with finalizer -> finalizer is removed",
initialObjects: []runtime.Object{vac1TerminatingWithFinalizer},
oldPV: pv1WithVAC1,
expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating),
},
},
{
name: "pv is deleted but other pv still holds terminating vac -> finalizer is not removed",
initialObjects: []runtime.Object{vac1TerminatingWithFinalizer, pv2WithVAC1},
oldPV: pv1WithVAC1,
expectedActions: []clienttesting.Action{},
},
{
name: "pv is deleted but pvc still holds terminating vac -> finalizer is not removed",
initialObjects: []runtime.Object{vac1TerminatingWithFinalizer, pvc1WithVAC1},
oldPV: pv1WithVAC1,
expectedActions: []clienttesting.Action{},
},
// PVC events
//
{
name: "pvc changes vac and deleted VAC with finalizer -> finalizer is removed",
initialObjects: []runtime.Object{vac1TerminatingWithFinalizer},
oldPVC: pvc1WithVAC1,
updatedPVC: pvc1WithVAC2,
expectedActions: []clienttesting.Action{
clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating),
},
},
{
name: "pvc changes vac but its status still holds terminating vac -> finalizer is not removed",
initialObjects: []runtime.Object{vac1TerminatingWithFinalizer},
oldPVC: pvc1WithVAC1,
updatedPVC: pvc1WithVAC2CurrentVAC1,
expectedActions: []clienttesting.Action{},
},
{
name: "pvc changes vac but its target status still holds terminating vac -> finalizer is not removed",
initialObjects: []runtime.Object{vac1TerminatingWithFinalizer},
oldPVC: pvc1WithVAC1,
updatedPVC: pvc1WithVAC2TargetVAC1,
expectedActions: []clienttesting.Action{},
},
{
name: "pvc is deleted but other pvc still holds terminating vac -> finalizer is not removed",
initialObjects: []runtime.Object{vac1TerminatingWithFinalizer, pvc2WithVAC1},
oldPVC: pvc1WithVAC1,
expectedActions: []clienttesting.Action{},
},
{
name: "pvc is deleted but pv still holds terminating vac -> finalizer is not removed",
initialObjects: []runtime.Object{vac1TerminatingWithFinalizer, pv1WithVAC1},
oldPVC: pvc1WithVAC1,
expectedActions: []clienttesting.Action{},
},
}
for _, test := range tests {
// Create client with initial data
objs := test.initialObjects
if test.updatedVAC != nil {
objs = append(objs, test.updatedVAC)
}
if test.updatedPV != nil {
objs = append(objs, test.updatedPV)
}
if test.updatedPVC != nil {
objs = append(objs, test.updatedPVC)
}
client := fake.NewSimpleClientset(objs...)
// Create informers
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
pvcInformer := informers.Core().V1().PersistentVolumeClaims()
pvInformer := informers.Core().V1().PersistentVolumes()
vacInformer := informers.Storage().V1beta1().VolumeAttributesClasses()
// Populate the informers with initial objects so the controller can
// Get() it.
for _, obj := range objs {
switch obj.(type) {
case *v1.PersistentVolumeClaim:
require.NoError(t, pvcInformer.Informer().GetStore().Add(obj), "failed to add object to PVC informer")
case *v1.PersistentVolume:
require.NoError(t, pvInformer.Informer().GetStore().Add(obj), "failed to add object to PV informer")
case *storagev1beta1.VolumeAttributesClass:
require.NoError(t, vacInformer.Informer().GetStore().Add(obj), "failed to add object to VAC informer")
default:
t.Fatalf("Unknown initialObject type: %+v", obj)
}
}
// Add reactor to inject test errors.
for _, reactor := range test.reactors {
client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorfn)
}
// Create the controller
logger, _ := ktesting.NewTestContext(t)
ctrl, err := NewVACProtectionController(logger, client, pvcInformer, pvInformer, vacInformer)
require.NoError(t, err, "failed to create controller")
// Start the test by simulating an event
if test.updatedVAC != nil {
ctrl.vacAddedUpdated(logger, test.updatedVAC)
}
if test.updatedPV != nil {
ctrl.pvUpdated(logger, test.oldPV, test.updatedPV)
} else if test.oldPV != nil {
ctrl.pvDeleted(logger, test.oldPV)
}
if test.updatedPVC != nil {
ctrl.pvcUpdated(logger, test.oldPVC, test.updatedPVC)
} else if test.oldPVC != nil {
ctrl.pvcDeleted(logger, test.oldPVC)
}
// Process the controller queue until we get expected results
timeout := time.Now().Add(10 * time.Second)
lastReportedActionCount := 0
for {
if time.Now().After(timeout) {
t.Errorf("Test %q: timed out", test.name)
break
}
if ctrl.queue.Len() > 0 {
logger.V(5).Info("Non-empty events queue, processing one", "test", test.name, "queueLength", ctrl.queue.Len())
ctrl.processNextWorkItem(context.TODO())
}
if ctrl.queue.Len() > 0 {
// There is still some work in the queue, process it now
continue
}
currentActionCount := len(client.Actions())
if currentActionCount < len(test.expectedActions) {
// Do not log evey wait, only when the action count changes.
if lastReportedActionCount < currentActionCount {
logger.V(5).Info("Waiting for the remaining actions", "test", test.name, "currentActionCount", currentActionCount, "expectedActionCount", len(test.expectedActions))
lastReportedActionCount = currentActionCount
}
// The test expected more to happen, wait for the actions.
// Most probably it's exponential backoff
time.Sleep(10 * time.Millisecond)
continue
}
break
}
actions := client.Actions()
if !reflect.DeepEqual(actions, test.expectedActions) {
t.Errorf("Test %q: action not expected\nExpected:\n%s\ngot:\n%s", test.name, dump.Pretty(test.expectedActions), dump.Pretty(actions))
}
}
}

View File

@ -22,4 +22,7 @@ const (
// PVProtectionFinalizer is the name of finalizer on PVs that are bound by PVCs
PVProtectionFinalizer = "kubernetes.io/pv-protection"
// VACProtectionFinalizer is the name of finalizer on VACs that are used by PVs or PVCs
VACProtectionFinalizer = "kubernetes.io/vac-protection"
)

View File

@ -427,6 +427,18 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
eventsRule(),
},
})
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass) {
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "volumeattributesclass-protection-controller"},
Rules: []rbacv1.PolicyRule{
rbacv1helpers.NewRule("list", "watch", "get").Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie(),
rbacv1helpers.NewRule("list", "watch", "get").Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch", "update").Groups(storageGroup).Resources("volumeattributesclasses").RuleOrDie(),
eventsRule(),
},
})
}
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "ttl-after-finished-controller"},
Rules: []rbacv1.PolicyRule{