diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 2c4782a18ba..e75edc50bd2 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -567,6 +567,7 @@ func NewControllerDescriptors() map[string]*ControllerDescriptor { register(newClusterRoleAggregrationControllerDescriptor()) register(newPersistentVolumeClaimProtectionControllerDescriptor()) register(newPersistentVolumeProtectionControllerDescriptor()) + register(newVolumeAttributesClassProtectionControllerDescriptor()) register(newTTLAfterFinishedControllerDescriptor()) register(newRootCACertificatePublisherControllerDescriptor()) register(newEphemeralVolumeControllerDescriptor()) diff --git a/cmd/kube-controller-manager/app/controllermanager_test.go b/cmd/kube-controller-manager/app/controllermanager_test.go index c20cd65f424..0b1aeb6e9d1 100644 --- a/cmd/kube-controller-manager/app/controllermanager_test.go +++ b/cmd/kube-controller-manager/app/controllermanager_test.go @@ -86,6 +86,7 @@ func TestControllerNamesDeclaration(t *testing.T) { names.ClusterRoleAggregationController, names.PersistentVolumeClaimProtectionController, names.PersistentVolumeProtectionController, + names.VolumeAttributesClassProtectionController, names.TTLAfterFinishedController, names.RootCACertificatePublisherController, names.EphemeralVolumeController, diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index a2b0660627c..61c9c296f02 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -64,6 +64,7 @@ import ( persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" "k8s.io/kubernetes/pkg/controller/volume/pvcprotection" "k8s.io/kubernetes/pkg/controller/volume/pvprotection" + "k8s.io/kubernetes/pkg/controller/volume/vacprotection" "k8s.io/kubernetes/pkg/features" quotainstall "k8s.io/kubernetes/pkg/quota/v1/install" "k8s.io/kubernetes/pkg/volume/csimigration" @@ -684,6 +685,31 @@ func startPersistentVolumeProtectionController(ctx context.Context, controllerCo return nil, true, nil } +func newVolumeAttributesClassProtectionControllerDescriptor() *ControllerDescriptor { + return &ControllerDescriptor{ + name: names.VolumeAttributesClassProtectionController, + initFunc: startVolumeAttributesClassProtectionController, + requiredFeatureGates: []featuregate.Feature{ + features.VolumeAttributesClass, + }, + } +} + +func startVolumeAttributesClassProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { + vacProtectionController, err := vacprotection.NewVACProtectionController( + klog.FromContext(ctx), + controllerContext.ClientBuilder.ClientOrDie("volumeattributesclass-protection-controller"), + controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), + controllerContext.InformerFactory.Core().V1().PersistentVolumes(), + controllerContext.InformerFactory.Storage().V1beta1().VolumeAttributesClasses(), + ) + if err != nil { + return nil, true, fmt.Errorf("failed to start the vac protection controller: %w", err) + } + go vacProtectionController.Run(ctx, 1) + return nil, true, nil +} + func newTTLAfterFinishedControllerDescriptor() *ControllerDescriptor { return &ControllerDescriptor{ name: names.TTLAfterFinishedController, diff --git a/cmd/kube-controller-manager/names/controller_names.go b/cmd/kube-controller-manager/names/controller_names.go index 11597628474..bcdea21bbae 100644 --- a/cmd/kube-controller-manager/names/controller_names.go +++ b/cmd/kube-controller-manager/names/controller_names.go @@ -82,6 +82,7 @@ const ( ResourceClaimController = "resourceclaim-controller" LegacyServiceAccountTokenCleanerController = "legacy-serviceaccount-token-cleaner-controller" ValidatingAdmissionPolicyStatusController = "validatingadmissionpolicy-status-controller" + VolumeAttributesClassProtectionController = "volumeattributesclass-protection-controller" ServiceCIDRController = "service-cidr-controller" StorageVersionMigratorController = "storage-version-migrator-controller" ) diff --git a/pkg/controller/volume/protectionutil/wrappers.go b/pkg/controller/volume/protectionutil/wrappers.go new file mode 100644 index 00000000000..aba1e7c6026 --- /dev/null +++ b/pkg/controller/volume/protectionutil/wrappers.go @@ -0,0 +1,216 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protectionutil + +import ( + v1 "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +// PodWrapper wraps a Pod inside. +type PodWrapper struct{ v1.Pod } + +// MakePod creates a Pod wrapper. +func MakePod() *PodWrapper { + return &PodWrapper{v1.Pod{}} +} + +// Obj returns the inner Pod. +func (p *PodWrapper) Obj() *v1.Pod { + return &p.Pod +} + +// Name sets `s` as the name of the inner pod. +func (p *PodWrapper) Name(s string) *PodWrapper { + p.SetName(s) + return p +} + +// UID sets `s` as the UID of the inner pod. +func (p *PodWrapper) UID(s string) *PodWrapper { + p.SetUID(types.UID(s)) + return p +} + +// SchedulerName sets `s` as the scheduler name of the inner pod. +func (p *PodWrapper) SchedulerName(s string) *PodWrapper { + p.Spec.SchedulerName = s + return p +} + +// Namespace sets `s` as the namespace of the inner pod. +func (p *PodWrapper) Namespace(s string) *PodWrapper { + p.SetNamespace(s) + return p +} + +// Terminating sets the inner pod's deletionTimestamp to current timestamp. +func (p *PodWrapper) Terminating() *PodWrapper { + now := metav1.Now() + p.DeletionTimestamp = &now + return p +} + +// PVC creates a Volume with a PVC and injects into the inner pod. +func (p *PodWrapper) PVC(name string) *PodWrapper { + p.Spec.Volumes = append(p.Spec.Volumes, v1.Volume{ + Name: name, + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: name}, + }, + }) + return p +} + +// Annotation sets a {k,v} pair to the inner pod annotation. +func (p *PodWrapper) Annotation(key, value string) *PodWrapper { + metav1.SetMetaDataAnnotation(&p.ObjectMeta, key, value) + return p +} + +// Annotations sets all {k,v} pair provided by `annotations` to the inner pod annotations. +func (p *PodWrapper) Annotations(annotations map[string]string) *PodWrapper { + for k, v := range annotations { + p.Annotation(k, v) + } + return p +} + +// PersistentVolumeClaimWrapper wraps a PersistentVolumeClaim inside. +type PersistentVolumeClaimWrapper struct{ v1.PersistentVolumeClaim } + +// MakePersistentVolumeClaim creates a PersistentVolumeClaim wrapper. +func MakePersistentVolumeClaim() *PersistentVolumeClaimWrapper { + return &PersistentVolumeClaimWrapper{} +} + +// Obj returns the inner PersistentVolumeClaim. +func (p *PersistentVolumeClaimWrapper) Obj() *v1.PersistentVolumeClaim { + return &p.PersistentVolumeClaim +} + +// Name sets `s` as the name of the inner PersistentVolumeClaim. +func (p *PersistentVolumeClaimWrapper) Name(s string) *PersistentVolumeClaimWrapper { + p.SetName(s) + return p +} + +// Namespace sets `s` as the namespace of the inner PersistentVolumeClaim. +func (p *PersistentVolumeClaimWrapper) Namespace(s string) *PersistentVolumeClaimWrapper { + p.SetNamespace(s) + return p +} + +// Annotation sets a {k,v} pair to the inner PersistentVolumeClaim. +func (p *PersistentVolumeClaimWrapper) Annotation(key, value string) *PersistentVolumeClaimWrapper { + metav1.SetMetaDataAnnotation(&p.ObjectMeta, key, value) + return p +} + +// VolumeName sets `name` as the volume name of the inner +// PersistentVolumeClaim. +func (p *PersistentVolumeClaimWrapper) VolumeName(name string) *PersistentVolumeClaimWrapper { + p.PersistentVolumeClaim.Spec.VolumeName = name + return p +} + +func (p *PersistentVolumeClaimWrapper) Finalizer(s string) *PersistentVolumeClaimWrapper { + p.Finalizers = append(p.Finalizers, s) + return p +} + +// VolumeAttributesClassName sets `s` as the VolumeAttributesClassName of the inner PersistentVolumeClaim. +func (p *PersistentVolumeClaimWrapper) VolumeAttributesClassName(s string) *PersistentVolumeClaimWrapper { + p.Spec.VolumeAttributesClassName = &s + return p +} + +// CurrentVolumeAttributesClassName sets `s` as the CurrentVolumeAttributesClassName of the inner PersistentVolumeClaim. +func (p *PersistentVolumeClaimWrapper) CurrentVolumeAttributesClassName(s string) *PersistentVolumeClaimWrapper { + p.Status.CurrentVolumeAttributesClassName = &s + return p +} + +// TargetVolumeAttributesClassName sets `s` as the TargetVolumeAttributesClassName of the inner PersistentVolumeClaim. +// It also sets the status to Pending. +func (p *PersistentVolumeClaimWrapper) TargetVolumeAttributesClassName(s string) *PersistentVolumeClaimWrapper { + p.Status.ModifyVolumeStatus = &v1.ModifyVolumeStatus{ + TargetVolumeAttributesClassName: s, + Status: v1.PersistentVolumeClaimModifyVolumePending, + } + return p +} + +// PersistentVolumeWrapper wraps a PersistentVolume inside. +type PersistentVolumeWrapper struct{ v1.PersistentVolume } + +// MakePersistentVolume creates a PersistentVolume wrapper. +func MakePersistentVolume() *PersistentVolumeWrapper { + return &PersistentVolumeWrapper{} +} + +// Obj returns the inner PersistentVolume. +func (p *PersistentVolumeWrapper) Obj() *v1.PersistentVolume { + return &p.PersistentVolume +} + +// Name sets `s` as the name of the inner PersistentVolume. +func (p *PersistentVolumeWrapper) Name(s string) *PersistentVolumeWrapper { + p.SetName(s) + return p +} + +// VolumeAttributesClassName sets `s` as the VolumeAttributesClassName of the inner PersistentVolume. +func (p *PersistentVolumeWrapper) VolumeAttributesClassName(s string) *PersistentVolumeWrapper { + p.Spec.VolumeAttributesClassName = &s + return p +} + +// VolumeAttributesClassWrapper wraps a VolumeAttributesClass inside. +type VolumeAttributesClassWrapper struct { + storagev1beta1.VolumeAttributesClass +} + +// MakeVolumeAttributesClass creates a VolumeAttributesClass wrapper. +func MakeVolumeAttributesClass() *VolumeAttributesClassWrapper { + return &VolumeAttributesClassWrapper{} +} + +// Obj returns the inner VolumeAttributesClass. +func (v *VolumeAttributesClassWrapper) Obj() *storagev1beta1.VolumeAttributesClass { + return &v.VolumeAttributesClass +} + +// Name sets `s` as the name of the inner VolumeAttributesClass. +func (v *VolumeAttributesClassWrapper) Name(s string) *VolumeAttributesClassWrapper { + v.SetName(s) + return v +} + +// Terminating sets the inner VolumeAttributesClass' deletionTimestamp to non-nil. +func (v *VolumeAttributesClassWrapper) Terminating() *VolumeAttributesClassWrapper { + v.DeletionTimestamp = &metav1.Time{} + return v +} + +// Finalizer appends `s` to the finalizers of the inner VolumeAttributesClass. +func (v *VolumeAttributesClassWrapper) Finalizer(s string) *VolumeAttributesClassWrapper { + v.Finalizers = append(v.Finalizers, s) + return v +} diff --git a/pkg/controller/volume/vacprotection/vac_protection_controller.go b/pkg/controller/volume/vacprotection/vac_protection_controller.go new file mode 100644 index 00000000000..550d379658a --- /dev/null +++ b/pkg/controller/volume/vacprotection/vac_protection_controller.go @@ -0,0 +1,437 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vacprotection + +import ( + "context" + "fmt" + "time" + + v1 "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + storageinformers "k8s.io/client-go/informers/storage/v1beta1" + clientset "k8s.io/client-go/kubernetes" + storagelisters "k8s.io/client-go/listers/storage/v1beta1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/controller/volume/protectionutil" + "k8s.io/kubernetes/pkg/util/slice" + volumeutil "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/utils/ptr" +) + +const ( + vacNameKeyIndex = "volumeAttributesClassName" +) + +// Controller is controller that adds and removes VACProtectionFinalizer +// from VACs that are not used by any PV or PVC. +// +// This controller only use informers, so it may remove the finalizer too early. +// +// One scenario is: +// 1. There is a VolumeAttributesClass that is not used by any PVC. This +// VolumeAttributesClass is synced to all informers (external-provisioner, +// external-resizer, KCM) +// +// 2. At the same time: +// +// * User creates a PVC that uses this VolumeAttributesClass. +// +// * Another user deletes the VolumeAttributesClass. +// +// 3. VolumeAttributesClass deletion event with DeletionTimestamp reaches +// this controller. Because the PVC creation event has not yet +// reached KCM informers, the controller lets the VolumeAttributesClass +// to be deleted by removing the finalizer. PVC creation event reaches +// the external-provisioner, before VolumeAttributesClass update. The +// external-provisioner will try to provision a new volume using the +// VolumeAttributesClass that will get deleted soon. +// +// * If the external-provisioner gets the VolumeAttributesClass before +// deletion in the informer, the provisioning will succeed. +// +// * Otherwise the external-prosivioner will fail the provisioning. +// +// Solving this scenario properly requires to Get/List requests to the API server, +// which will cause performance issue in larger cluster similar to the existing +// PVC protection controller - related issue https://github.com/kubernetes/kubernetes/issues/109282 + +type Controller struct { + client clientset.Interface + + pvcSynced cache.InformerSynced + pvSynced cache.InformerSynced + vacLister storagelisters.VolumeAttributesClassLister + vacSynced cache.InformerSynced + + getPVsAssignedToVAC func(vacName string) ([]*v1.PersistentVolume, error) + getPVCsAssignedToVAC func(vacName string) ([]*v1.PersistentVolumeClaim, error) + + queue workqueue.TypedRateLimitingInterface[string] +} + +// NewVACProtectionController returns a new *Controller. +func NewVACProtectionController(logger klog.Logger, + client clientset.Interface, + pvcInformer coreinformers.PersistentVolumeClaimInformer, + pvInformer coreinformers.PersistentVolumeInformer, + vacInformer storageinformers.VolumeAttributesClassInformer) (*Controller, error) { + c := &Controller{ + client: client, + pvcSynced: pvcInformer.Informer().HasSynced, + pvSynced: pvInformer.Informer().HasSynced, + vacLister: vacInformer.Lister(), + vacSynced: vacInformer.Informer().HasSynced, + queue: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[string](), + workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "vacprotection", + }, + ), + } + + _, _ = vacInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.vacAddedUpdated(logger, obj) + }, + UpdateFunc: func(old, new interface{}) { + c.vacAddedUpdated(logger, new) + }, + }) + + err := pvInformer.Informer().AddIndexers(cache.Indexers{vacNameKeyIndex: func(obj interface{}) ([]string, error) { + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + return []string{}, nil + } + return getPVReferencedVACNames(pv), nil + }}) + if err != nil { + return nil, fmt.Errorf("failed to add index to PV informer: %w", err) + } + + pvIndexer := pvInformer.Informer().GetIndexer() + c.getPVsAssignedToVAC = func(vacName string) ([]*v1.PersistentVolume, error) { + objs, err := pvIndexer.ByIndex(vacNameKeyIndex, vacName) + if err != nil { + return nil, err + } + pvcs := make([]*v1.PersistentVolume, 0, len(objs)) + for _, obj := range objs { + pvc, ok := obj.(*v1.PersistentVolume) + if !ok { + continue + } + pvcs = append(pvcs, pvc) + } + return pvcs, nil + } + + err = pvcInformer.Informer().AddIndexers(cache.Indexers{vacNameKeyIndex: func(obj interface{}) ([]string, error) { + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return []string{}, nil + } + return getPVCReferencedVACNames(pvc), nil + }}) + if err != nil { + return nil, fmt.Errorf("failed to add index to PVC informer: %w", err) + } + + pvcIndexer := pvcInformer.Informer().GetIndexer() + c.getPVCsAssignedToVAC = func(vacName string) ([]*v1.PersistentVolumeClaim, error) { + objs, err := pvcIndexer.ByIndex(vacNameKeyIndex, vacName) + if err != nil { + return nil, err + } + pvcs := make([]*v1.PersistentVolumeClaim, 0, len(objs)) + for _, obj := range objs { + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + continue + } + pvcs = append(pvcs, pvc) + } + return pvcs, nil + } + + _, _ = pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, new interface{}) { + c.pvcUpdated(logger, old, new) + }, + DeleteFunc: func(obj interface{}) { + c.pvcDeleted(logger, obj) + }, + }) + + _, _ = pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, new interface{}) { + c.pvUpdated(logger, old, new) + }, + DeleteFunc: func(obj interface{}) { + c.pvDeleted(logger, obj) + }, + }) + return c, nil +} + +// Run runs the controller goroutines. +func (c *Controller) Run(ctx context.Context, workers int) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + logger := klog.FromContext(ctx) + logger.Info("Starting VAC protection controller") + defer logger.Info("Shutting down VAC protection controller") + + if !cache.WaitForNamedCacheSync("VAC protection", ctx.Done(), c.pvSynced, c.pvcSynced, c.vacSynced) { + return + } + + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, c.runWorker, time.Second) + } + + <-ctx.Done() +} + +func (c *Controller) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit. +func (c *Controller) processNextWorkItem(ctx context.Context) bool { + vacKey, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(vacKey) + + err := c.processVAC(ctx, vacKey) + if err == nil { + c.queue.Forget(vacKey) + return true + } + + utilruntime.HandleError(fmt.Errorf("VAC %v failed with : %w", vacKey, err)) + c.queue.AddRateLimited(vacKey) + + return true +} + +func (c *Controller) processVAC(ctx context.Context, vacName string) error { + logger := klog.FromContext(ctx) + logger.V(4).Info("Processing VAC", "VAC", klog.KRef("", vacName)) + startTime := time.Now() + defer func() { + logger.V(4).Info("Finished processing VAC", "VAC", klog.KRef("", vacName), "cost", time.Since(startTime)) + }() + + vac, err := c.vacLister.Get(vacName) + if err != nil { + if apierrors.IsNotFound(err) { + logger.V(4).Info("VAC not found, ignoring", "VAC", klog.KRef("", vacName)) + return nil + } + return err + } + + if protectionutil.IsDeletionCandidate(vac, volumeutil.VACProtectionFinalizer) { + // VAC should be deleted. Check if it's used and remove finalizer if + // it's not. + isUsed := c.isBeingUsed(ctx, vac) + if !isUsed { + return c.removeFinalizer(ctx, vac) + } + logger.V(4).Info("Keeping VAC because it is being used", "PVC", klog.KRef("", vacName)) + } + + if protectionutil.NeedToAddFinalizer(vac, volumeutil.VACProtectionFinalizer) { + return c.addFinalizer(ctx, vac) + } + + return nil +} + +func (c *Controller) addFinalizer(ctx context.Context, vac *storagev1beta1.VolumeAttributesClass) error { + vacClone := vac.DeepCopy() + vacClone.ObjectMeta.Finalizers = append(vacClone.ObjectMeta.Finalizers, volumeutil.VACProtectionFinalizer) + _, err := c.client.StorageV1beta1().VolumeAttributesClasses().Update(ctx, vacClone, metav1.UpdateOptions{}) + logger := klog.FromContext(ctx) + if err != nil { + logger.V(3).Info("Error adding protection finalizer to VAC", "VAC", klog.KObj(vac), "err", err) + return err + } + logger.V(3).Info("Added protection finalizer to VAC", "VAC", klog.KObj(vac)) + return nil +} + +func (c *Controller) removeFinalizer(ctx context.Context, vac *storagev1beta1.VolumeAttributesClass) error { + vacClone := vac.DeepCopy() + vacClone.ObjectMeta.Finalizers = slice.RemoveString(vacClone.ObjectMeta.Finalizers, volumeutil.VACProtectionFinalizer, nil) + _, err := c.client.StorageV1beta1().VolumeAttributesClasses().Update(ctx, vacClone, metav1.UpdateOptions{}) + logger := klog.FromContext(ctx) + if err != nil { + logger.V(3).Info("Error removing protection finalizer from VAC", "VAC", klog.KObj(vac), "err", err) + return err + } + logger.V(3).Info("Removed protection finalizer from VAC", "VAC", klog.KObj(vac)) + return nil +} + +func (c *Controller) isBeingUsed(ctx context.Context, vac *storagev1beta1.VolumeAttributesClass) bool { + logger := klog.FromContext(ctx) + + pvs, err := c.getPVsAssignedToVAC(vac.Name) + if err != nil { + logger.Error(err, "Error getting PVs assigned to VAC", "VAC", klog.KObj(vac)) + return true + } + if len(pvs) > 0 { + return true + } + + pvcs, err := c.getPVCsAssignedToVAC(vac.Name) + if err != nil { + logger.Error(err, "Error getting PVCs assigned to VAC", "VAC", klog.KObj(vac)) + return true + } + if len(pvcs) > 0 { + return true + } + return false +} + +// pvAddedUpdated reacts to vac added/updated events +func (c *Controller) vacAddedUpdated(logger klog.Logger, obj interface{}) { + vac, ok := obj.(*storagev1beta1.VolumeAttributesClass) + if !ok { + utilruntime.HandleError(fmt.Errorf("VAC informer returned non-VAC object: %#v", obj)) + return + } + logger.V(4).Info("Got event on VAC", "VAC", klog.KObj(vac)) + + if protectionutil.NeedToAddFinalizer(vac, volumeutil.VACProtectionFinalizer) || protectionutil.IsDeletionCandidate(vac, volumeutil.VACProtectionFinalizer) { + c.queue.Add(vac.Name) + } +} + +// pvcDeleted reacts to pvc deleted events +func (c *Controller) pvcDeleted(logger klog.Logger, obj interface{}) { + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", obj)) + return + } + logger.V(4).Info("Got event on PVC", "PVC", klog.KObj(pvc)) + vacNames := getPVCReferencedVACNames(pvc) + for _, vacName := range vacNames { + c.queue.Add(vacName) + } +} + +// pvcUpdated reacts to pvc updated events +func (c *Controller) pvcUpdated(logger klog.Logger, old, new interface{}) { + oldPVC, ok := old.(*v1.PersistentVolumeClaim) + if !ok { + utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", old)) + return + } + newPVC, ok := new.(*v1.PersistentVolumeClaim) + if !ok { + utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", new)) + return + } + + logger.V(4).Info("Got event on PVC", "PVC", klog.KObj(newPVC)) + + vavNames := sets.New(getPVCReferencedVACNames(oldPVC)...).Delete(getPVCReferencedVACNames(newPVC)...).UnsortedList() + for _, vacName := range vavNames { + c.queue.Add(vacName) + } +} + +// pvUpdated reacts to pv updated events +func (c *Controller) pvUpdated(logger klog.Logger, old, new interface{}) { + oldPV, ok := old.(*v1.PersistentVolume) + if !ok { + utilruntime.HandleError(fmt.Errorf("PV informer returned non-PV object: %#v", old)) + return + } + newPV, ok := new.(*v1.PersistentVolume) + if !ok { + utilruntime.HandleError(fmt.Errorf("PV informer returned non-PV object: %#v", new)) + return + } + + logger.V(4).Info("Got event on PV", "PV", klog.KObj(newPV)) + vavNames := sets.New(getPVReferencedVACNames(oldPV)...).Delete(getPVReferencedVACNames(newPV)...).UnsortedList() + for _, vacName := range vavNames { + c.queue.Add(vacName) + } +} + +// pvDeleted reacts to pv deleted events +func (c *Controller) pvDeleted(logger klog.Logger, obj interface{}) { + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + utilruntime.HandleError(fmt.Errorf("PV informer returned non-PV object: %#v", obj)) + return + } + logger.V(4).Info("Got event on PV", "PV", klog.KObj(pv)) + vacNames := getPVReferencedVACNames(pv) + for _, vacName := range vacNames { + c.queue.Add(vacName) + } +} + +// getPVCReferencedVACNames returns a list of VAC names that are referenced by the PVC. +func getPVCReferencedVACNames(pvc *v1.PersistentVolumeClaim) []string { + keys := sets.New[string]() + vacName := ptr.Deref(pvc.Spec.VolumeAttributesClassName, "") + if vacName != "" { + keys.Insert(vacName) + } + vacName = ptr.Deref(pvc.Status.CurrentVolumeAttributesClassName, "") + if vacName != "" { + keys.Insert(vacName) + } + status := pvc.Status.ModifyVolumeStatus + if status != nil && status.TargetVolumeAttributesClassName != "" { + keys.Insert(status.TargetVolumeAttributesClassName) + } + return keys.UnsortedList() +} + +// getPVReferencedVACNames returns a list of VAC names that are referenced by the PV. +func getPVReferencedVACNames(pv *v1.PersistentVolume) []string { + result := []string{} + vacName := ptr.Deref(pv.Spec.VolumeAttributesClassName, "") + if vacName != "" { + result = append(result, vacName) + } + return result +} diff --git a/pkg/controller/volume/vacprotection/vac_protection_controller_test.go b/pkg/controller/volume/vacprotection/vac_protection_controller_test.go new file mode 100644 index 00000000000..777dd987f14 --- /dev/null +++ b/pkg/controller/volume/vacprotection/vac_protection_controller_test.go @@ -0,0 +1,355 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vacprotection + +import ( + "context" + "errors" + "reflect" + "testing" + "time" + + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/dump" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + clienttesting "k8s.io/client-go/testing" + "k8s.io/klog/v2/ktesting" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/volume/protectionutil" + volumeutil "k8s.io/kubernetes/pkg/volume/util" +) + +var ( + vacGVR = schema.GroupVersionResource{ + Group: storagev1beta1.GroupName, + Version: "v1beta1", + Resource: "volumeattributesclasses", + } + + vac1 = protectionutil.MakeVolumeAttributesClass().Name("vac1").Obj() + vac1WithFinalizer = protectionutil.MakeVolumeAttributesClass().Name("vac1").Finalizer(volumeutil.VACProtectionFinalizer).Obj() + vac1TerminatingWithFinalizer = protectionutil.MakeVolumeAttributesClass().Name("vac1").Finalizer(volumeutil.VACProtectionFinalizer).Terminating().Obj() + vac1Terminating = protectionutil.MakeVolumeAttributesClass().Name("vac1").Terminating().Obj() + + pv1WithVAC1 = protectionutil.MakePersistentVolume().Name("pv1").VolumeAttributesClassName("vac1").Obj() + pv1WithVAC2 = protectionutil.MakePersistentVolume().Name("pv1").VolumeAttributesClassName("vac2").Obj() + pv2WithVAC1 = protectionutil.MakePersistentVolume().Name("pv2").VolumeAttributesClassName("vac1").Obj() + + pvc1WithVAC1 = protectionutil.MakePersistentVolumeClaim().Name("pvc1").VolumeAttributesClassName("vac1").Obj() + pvc1WithVAC2 = protectionutil.MakePersistentVolumeClaim().Name("pvc1").VolumeAttributesClassName("vac2").Obj() + pvc1WithVAC2CurrentVAC1 = protectionutil.MakePersistentVolumeClaim().Name("pvc1").VolumeAttributesClassName("vac2").CurrentVolumeAttributesClassName("vac1").Obj() + pvc1WithVAC2TargetVAC1 = protectionutil.MakePersistentVolumeClaim().Name("pvc1").VolumeAttributesClassName("vac2").TargetVolumeAttributesClassName("vac1").Obj() + pvc2WithVAC1 = protectionutil.MakePersistentVolumeClaim().Name("pvc2").VolumeAttributesClassName("vac1").Obj() +) + +type reaction struct { + verb string + resource string + reactorfn clienttesting.ReactionFunc +} + +func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionFunc { + i := 0 + return func(action clienttesting.Action) (bool, runtime.Object, error) { + i++ + if i <= failures { + // Update fails + update, ok := action.(clienttesting.UpdateAction) + + if !ok { + t.Fatalf("Reactor got non-update action: %+v", action) + } + acc, _ := meta.Accessor(update.GetObject()) + return true, nil, apierrors.NewForbidden(update.GetResource().GroupResource(), acc.GetName(), errors.New("Mock error")) + } + // Update succeeds + return false, nil, nil + } +} + +func TestVACProtectionController(t *testing.T) { + tests := []struct { + name string + // Object to insert into fake kubeclient before the test starts. + initialObjects []runtime.Object + // Optional client reactors. + reactors []reaction + + // VAC event to simulate. This VAC will be automatically added to + // initialObjects. + updatedVAC *storagev1beta1.VolumeAttributesClass + + // PV event to simulate. The updatedPV will be automatically added to + // initialObjects. + oldPV *v1.PersistentVolume + updatedPV *v1.PersistentVolume + + // PVC event to simulate. The updatedPVC will be automatically added to + // initialObjects. + oldPVC *v1.PersistentVolumeClaim + updatedPVC *v1.PersistentVolumeClaim + + // List of expected kubeclient actions that should happen during the + // test. + expectedActions []clienttesting.Action + }{ + // VAC events + // + { + name: "VAC without finalizer -> finalizer is added", + updatedVAC: vac1, + expectedActions: []clienttesting.Action{ + clienttesting.NewUpdateAction(vacGVR, "", vac1WithFinalizer), + }, + }, + { + name: "VAC with finalizer -> no action", + updatedVAC: vac1WithFinalizer, + expectedActions: []clienttesting.Action{}, + }, + { + name: "saving VAC finalizer fails -> controller retries", + updatedVAC: vac1, + reactors: []reaction{ + { + verb: "update", + resource: "volumeattributesclasses", + reactorfn: generateUpdateErrorFunc(t, 2 /* update fails twice*/), + }, + }, + expectedActions: []clienttesting.Action{ + // This fails + clienttesting.NewUpdateAction(vacGVR, "", vac1WithFinalizer), + // This fails too + clienttesting.NewUpdateAction(vacGVR, "", vac1WithFinalizer), + // This succeeds + clienttesting.NewUpdateAction(vacGVR, "", vac1WithFinalizer), + }, + }, + { + name: "deleted VAC with finalizer -> finalizer is removed", + updatedVAC: vac1TerminatingWithFinalizer, + expectedActions: []clienttesting.Action{ + clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating), + }, + }, + { + name: "finalizer removal fails -> controller retries", + updatedVAC: vac1TerminatingWithFinalizer, + reactors: []reaction{ + { + verb: "update", + resource: "volumeattributesclasses", + reactorfn: generateUpdateErrorFunc(t, 2 /* update fails twice*/), + }, + }, + expectedActions: []clienttesting.Action{ + // Fails + clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating), + // Fails too + clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating), + // Succeeds + clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating), + }, + }, + { + name: "deleted VAC with finalizer but it's referenced by a PV -> finalizer is not removed", + initialObjects: []runtime.Object{pv1WithVAC1}, + updatedVAC: vac1TerminatingWithFinalizer, + expectedActions: []clienttesting.Action{}, + }, + { + name: "deleted VAC with finalizer but it's referenced by a PVC -> finalizer is not removed", + initialObjects: []runtime.Object{pvc1WithVAC1}, + updatedVAC: vac1TerminatingWithFinalizer, + expectedActions: []clienttesting.Action{}, + }, + // PV events + // + { + name: "pv changes vac and deleted VAC with finalizer -> finalizer is removed", + initialObjects: []runtime.Object{vac1TerminatingWithFinalizer}, + oldPV: pv1WithVAC1, + updatedPV: pv1WithVAC2, + expectedActions: []clienttesting.Action{ + clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating), + }, + }, + { + name: "pv is deleted and deleted VAC with finalizer -> finalizer is removed", + initialObjects: []runtime.Object{vac1TerminatingWithFinalizer}, + oldPV: pv1WithVAC1, + expectedActions: []clienttesting.Action{ + clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating), + }, + }, + { + name: "pv is deleted but other pv still holds terminating vac -> finalizer is not removed", + initialObjects: []runtime.Object{vac1TerminatingWithFinalizer, pv2WithVAC1}, + oldPV: pv1WithVAC1, + expectedActions: []clienttesting.Action{}, + }, + { + name: "pv is deleted but pvc still holds terminating vac -> finalizer is not removed", + initialObjects: []runtime.Object{vac1TerminatingWithFinalizer, pvc1WithVAC1}, + oldPV: pv1WithVAC1, + expectedActions: []clienttesting.Action{}, + }, + // PVC events + // + { + name: "pvc changes vac and deleted VAC with finalizer -> finalizer is removed", + initialObjects: []runtime.Object{vac1TerminatingWithFinalizer}, + oldPVC: pvc1WithVAC1, + updatedPVC: pvc1WithVAC2, + expectedActions: []clienttesting.Action{ + clienttesting.NewUpdateAction(vacGVR, "", vac1Terminating), + }, + }, + { + name: "pvc changes vac but its status still holds terminating vac -> finalizer is not removed", + initialObjects: []runtime.Object{vac1TerminatingWithFinalizer}, + oldPVC: pvc1WithVAC1, + updatedPVC: pvc1WithVAC2CurrentVAC1, + expectedActions: []clienttesting.Action{}, + }, + { + name: "pvc changes vac but its target status still holds terminating vac -> finalizer is not removed", + initialObjects: []runtime.Object{vac1TerminatingWithFinalizer}, + oldPVC: pvc1WithVAC1, + updatedPVC: pvc1WithVAC2TargetVAC1, + expectedActions: []clienttesting.Action{}, + }, + { + name: "pvc is deleted but other pvc still holds terminating vac -> finalizer is not removed", + initialObjects: []runtime.Object{vac1TerminatingWithFinalizer, pvc2WithVAC1}, + oldPVC: pvc1WithVAC1, + expectedActions: []clienttesting.Action{}, + }, + { + name: "pvc is deleted but pv still holds terminating vac -> finalizer is not removed", + initialObjects: []runtime.Object{vac1TerminatingWithFinalizer, pv1WithVAC1}, + oldPVC: pvc1WithVAC1, + expectedActions: []clienttesting.Action{}, + }, + } + + for _, test := range tests { + // Create client with initial data + objs := test.initialObjects + if test.updatedVAC != nil { + objs = append(objs, test.updatedVAC) + } + if test.updatedPV != nil { + objs = append(objs, test.updatedPV) + } + if test.updatedPVC != nil { + objs = append(objs, test.updatedPVC) + } + + client := fake.NewSimpleClientset(objs...) + + // Create informers + informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + pvcInformer := informers.Core().V1().PersistentVolumeClaims() + pvInformer := informers.Core().V1().PersistentVolumes() + vacInformer := informers.Storage().V1beta1().VolumeAttributesClasses() + + // Populate the informers with initial objects so the controller can + // Get() it. + for _, obj := range objs { + switch obj.(type) { + case *v1.PersistentVolumeClaim: + require.NoError(t, pvcInformer.Informer().GetStore().Add(obj), "failed to add object to PVC informer") + case *v1.PersistentVolume: + require.NoError(t, pvInformer.Informer().GetStore().Add(obj), "failed to add object to PV informer") + case *storagev1beta1.VolumeAttributesClass: + require.NoError(t, vacInformer.Informer().GetStore().Add(obj), "failed to add object to VAC informer") + default: + t.Fatalf("Unknown initialObject type: %+v", obj) + } + } + + // Add reactor to inject test errors. + for _, reactor := range test.reactors { + client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorfn) + } + + // Create the controller + logger, _ := ktesting.NewTestContext(t) + ctrl, err := NewVACProtectionController(logger, client, pvcInformer, pvInformer, vacInformer) + require.NoError(t, err, "failed to create controller") + + // Start the test by simulating an event + if test.updatedVAC != nil { + ctrl.vacAddedUpdated(logger, test.updatedVAC) + } + if test.updatedPV != nil { + ctrl.pvUpdated(logger, test.oldPV, test.updatedPV) + } else if test.oldPV != nil { + ctrl.pvDeleted(logger, test.oldPV) + } + if test.updatedPVC != nil { + ctrl.pvcUpdated(logger, test.oldPVC, test.updatedPVC) + } else if test.oldPVC != nil { + ctrl.pvcDeleted(logger, test.oldPVC) + } + + // Process the controller queue until we get expected results + timeout := time.Now().Add(10 * time.Second) + lastReportedActionCount := 0 + for { + if time.Now().After(timeout) { + t.Errorf("Test %q: timed out", test.name) + break + } + if ctrl.queue.Len() > 0 { + logger.V(5).Info("Non-empty events queue, processing one", "test", test.name, "queueLength", ctrl.queue.Len()) + ctrl.processNextWorkItem(context.TODO()) + } + if ctrl.queue.Len() > 0 { + // There is still some work in the queue, process it now + continue + } + currentActionCount := len(client.Actions()) + if currentActionCount < len(test.expectedActions) { + // Do not log evey wait, only when the action count changes. + if lastReportedActionCount < currentActionCount { + logger.V(5).Info("Waiting for the remaining actions", "test", test.name, "currentActionCount", currentActionCount, "expectedActionCount", len(test.expectedActions)) + lastReportedActionCount = currentActionCount + } + // The test expected more to happen, wait for the actions. + // Most probably it's exponential backoff + time.Sleep(10 * time.Millisecond) + continue + } + break + } + actions := client.Actions() + + if !reflect.DeepEqual(actions, test.expectedActions) { + t.Errorf("Test %q: action not expected\nExpected:\n%s\ngot:\n%s", test.name, dump.Pretty(test.expectedActions), dump.Pretty(actions)) + } + } +} diff --git a/pkg/volume/util/finalizer.go b/pkg/volume/util/finalizer.go index e1fdf5673c4..0c2b205eb12 100644 --- a/pkg/volume/util/finalizer.go +++ b/pkg/volume/util/finalizer.go @@ -22,4 +22,7 @@ const ( // PVProtectionFinalizer is the name of finalizer on PVs that are bound by PVCs PVProtectionFinalizer = "kubernetes.io/pv-protection" + + // VACProtectionFinalizer is the name of finalizer on VACs that are used by PVs or PVCs + VACProtectionFinalizer = "kubernetes.io/vac-protection" ) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index 16b0a93f3c5..90deb4a5d25 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -427,6 +427,18 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) eventsRule(), }, }) + if utilfeature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass) { + addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "volumeattributesclass-protection-controller"}, + Rules: []rbacv1.PolicyRule{ + rbacv1helpers.NewRule("list", "watch", "get").Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie(), + rbacv1helpers.NewRule("list", "watch", "get").Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie(), + rbacv1helpers.NewRule("get", "list", "watch", "update").Groups(storageGroup).Resources("volumeattributesclasses").RuleOrDie(), + eventsRule(), + }, + }) + } + addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "ttl-after-finished-controller"}, Rules: []rbacv1.PolicyRule{