diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 83395565b4b..16b31f5e5af 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -22,6 +22,7 @@ package app import ( "context" "fmt" + "k8s.io/apimachinery/pkg/runtime/schema" "math/rand" "net/http" "os" @@ -74,6 +75,7 @@ import ( "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" "k8s.io/kubernetes/cmd/kube-controller-manager/names" kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config" + garbagecollector "k8s.io/kubernetes/pkg/controller/garbagecollector" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount" ) @@ -227,7 +229,7 @@ func Run(ctx context.Context, c *config.CompletedConfig) error { saTokenControllerDescriptor := newServiceAccountTokenControllerDescriptor(rootClientBuilder) run := func(ctx context.Context, controllerDescriptors map[string]*ControllerDescriptor) { - controllerContext, err := CreateControllerContext(logger, c, rootClientBuilder, clientBuilder, ctx.Done()) + controllerContext, err := CreateControllerContext(ctx, c, rootClientBuilder, clientBuilder) if err != nil { logger.Error(err, "Error building controller context") klog.FlushAndExit(klog.ExitFlushTimeout, 1) @@ -378,6 +380,9 @@ type ControllerContext struct { // ControllerManagerMetrics provides a proxy to set controller manager specific metrics. ControllerManagerMetrics *controllersmetrics.ControllerManagerMetrics + + // GraphBuilder gives an access to dependencyGraphBuilder which keeps tracks of resources in the cluster + GraphBuilder *garbagecollector.GraphBuilder } // IsControllerEnabled checks if the context's controllers enabled or not @@ -558,6 +563,7 @@ func NewControllerDescriptors() map[string]*ControllerDescriptor { register(newValidatingAdmissionPolicyStatusControllerDescriptor()) register(newTaintEvictionControllerDescriptor()) register(newServiceCIDRsControllerDescriptor()) + register(newStorageVersionMigratorControllerDescriptor()) for _, alias := range aliases.UnsortedList() { if _, ok := controllers[alias]; ok { @@ -571,7 +577,7 @@ func NewControllerDescriptors() map[string]*ControllerDescriptor { // CreateControllerContext creates a context struct containing references to resources needed by the // controllers such as the cloud provider and clientBuilder. rootClientBuilder is only used for // the shared-informers client and token controller. -func CreateControllerContext(logger klog.Logger, s *config.CompletedConfig, rootClientBuilder, clientBuilder clientbuilder.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) { +func CreateControllerContext(ctx context.Context, s *config.CompletedConfig, rootClientBuilder, clientBuilder clientbuilder.ControllerClientBuilder) (ControllerContext, error) { // Informer transform to trim ManagedFields for memory efficiency. trim := func(obj interface{}) (interface{}, error) { if accessor, err := meta.Accessor(obj); err == nil { @@ -598,15 +604,15 @@ func CreateControllerContext(logger klog.Logger, s *config.CompletedConfig, root restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient) go wait.Until(func() { restMapper.Reset() - }, 30*time.Second, stop) + }, 30*time.Second, ctx.Done()) - cloud, loopMode, err := createCloudProvider(logger, s.ComponentConfig.KubeCloudShared.CloudProvider.Name, s.ComponentConfig.KubeCloudShared.ExternalCloudVolumePlugin, + cloud, loopMode, err := createCloudProvider(klog.FromContext(ctx), s.ComponentConfig.KubeCloudShared.CloudProvider.Name, s.ComponentConfig.KubeCloudShared.ExternalCloudVolumePlugin, s.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers) if err != nil { return ControllerContext{}, err } - ctx := ControllerContext{ + controllerContext := ControllerContext{ ClientBuilder: clientBuilder, InformerFactory: sharedInformers, ObjectOrMetadataInformerFactory: informerfactory.NewInformerFactory(sharedInformers, metadataInformers), @@ -618,8 +624,26 @@ func CreateControllerContext(logger klog.Logger, s *config.CompletedConfig, root ResyncPeriod: ResyncPeriod(s), ControllerManagerMetrics: controllersmetrics.NewControllerManagerMetrics("kube-controller-manager"), } + + if controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector && + controllerContext.IsControllerEnabled(NewControllerDescriptors()[names.GarbageCollectorController]) { + ignoredResources := make(map[schema.GroupResource]struct{}) + for _, r := range controllerContext.ComponentConfig.GarbageCollectorController.GCIgnoredResources { + ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{} + } + + controllerContext.GraphBuilder = garbagecollector.NewDependencyGraphBuilder( + ctx, + metadataClient, + controllerContext.RESTMapper, + ignoredResources, + controllerContext.ObjectOrMetadataInformerFactory, + controllerContext.InformersStarted, + ) + } + controllersmetrics.Register() - return ctx, nil + return controllerContext, nil } // StartControllers starts a set of controllers with a specified ControllerContext diff --git a/cmd/kube-controller-manager/app/controllermanager_test.go b/cmd/kube-controller-manager/app/controllermanager_test.go index fc4ee5512f8..67406a186e8 100644 --- a/cmd/kube-controller-manager/app/controllermanager_test.go +++ b/cmd/kube-controller-manager/app/controllermanager_test.go @@ -94,6 +94,7 @@ func TestControllerNamesDeclaration(t *testing.T) { names.LegacyServiceAccountTokenCleanerController, names.ValidatingAdmissionPolicyStatusController, names.ServiceCIDRController, + names.StorageVersionMigratorController, ) for _, name := range KnownControllers() { diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 2991f0e59c2..ab052c35ce8 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -688,17 +688,16 @@ func startGarbageCollectorController(ctx context.Context, controllerContext Cont for _, r := range controllerContext.ComponentConfig.GarbageCollectorController.GCIgnoredResources { ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{} } - garbageCollector, err := garbagecollector.NewGarbageCollector( + + garbageCollector, err := garbagecollector.NewComposedGarbageCollector( ctx, gcClientset, metadataClient, controllerContext.RESTMapper, - ignoredResources, - controllerContext.ObjectOrMetadataInformerFactory, - controllerContext.InformersStarted, + controllerContext.GraphBuilder, ) if err != nil { - return nil, true, fmt.Errorf("failed to start the generic garbage collector: %v", err) + return nil, true, fmt.Errorf("failed to start the generic garbage collector: %w", err) } // Start the garbage collector. diff --git a/cmd/kube-controller-manager/app/storageversionmigrator.go b/cmd/kube-controller-manager/app/storageversionmigrator.go new file mode 100644 index 00000000000..d1cb4f2160c --- /dev/null +++ b/cmd/kube-controller-manager/app/storageversionmigrator.go @@ -0,0 +1,92 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "fmt" + + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/metadata" + "k8s.io/controller-manager/controller" + "k8s.io/kubernetes/cmd/kube-controller-manager/names" + "k8s.io/kubernetes/pkg/features" + + utilfeature "k8s.io/apiserver/pkg/util/feature" + clientgofeaturegate "k8s.io/client-go/features" + svm "k8s.io/kubernetes/pkg/controller/storageversionmigrator" +) + +func newStorageVersionMigratorControllerDescriptor() *ControllerDescriptor { + return &ControllerDescriptor{ + name: names.StorageVersionMigratorController, + aliases: []string{"svm"}, + initFunc: startSVMController, + } +} + +func startSVMController( + ctx context.Context, + controllerContext ControllerContext, + controllerName string, +) (controller.Interface, bool, error) { + if !utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionMigrator) || + !clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) { + return nil, false, nil + } + + if !controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector { + return nil, true, fmt.Errorf("storage version migrator requires garbage collector") + } + + config := controllerContext.ClientBuilder.ConfigOrDie(controllerName) + client := controllerContext.ClientBuilder.ClientOrDie(controllerName) + informer := controllerContext.InformerFactory.Storagemigration().V1alpha1().StorageVersionMigrations() + + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + return nil, false, err + } + + discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) + if err != nil { + return nil, false, err + } + + go svm.NewResourceVersionController( + ctx, + client, + discoveryClient, + metadata.NewForConfigOrDie(config), + informer, + controllerContext.RESTMapper, + ).Run(ctx) + + svmController := svm.NewSVMController( + ctx, + client, + dynamicClient, + informer, + controllerName, + controllerContext.RESTMapper, + controllerContext.GraphBuilder, + ) + go svmController.Run(ctx) + + return svmController, true, nil +} diff --git a/cmd/kube-controller-manager/names/controller_names.go b/cmd/kube-controller-manager/names/controller_names.go index 3418c41b46d..11597628474 100644 --- a/cmd/kube-controller-manager/names/controller_names.go +++ b/cmd/kube-controller-manager/names/controller_names.go @@ -83,4 +83,5 @@ const ( LegacyServiceAccountTokenCleanerController = "legacy-serviceaccount-token-cleaner-controller" ValidatingAdmissionPolicyStatusController = "validatingadmissionpolicy-status-controller" ServiceCIDRController = "service-cidr-controller" + StorageVersionMigratorController = "storage-version-migrator-controller" ) diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index e66b58fdafa..7ca41910265 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -20,11 +20,11 @@ import ( "context" goerrors "errors" "fmt" + "k8s.io/controller-manager/pkg/informerfactory" "reflect" "sync" "time" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -42,10 +42,8 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/controller-manager/controller" - "k8s.io/controller-manager/pkg/informerfactory" "k8s.io/klog/v2" c "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/apis/config/scheme" "k8s.io/kubernetes/pkg/controller/garbagecollector/metrics" ) @@ -93,36 +91,28 @@ func NewGarbageCollector( sharedInformers informerfactory.InformerFactory, informersStarted <-chan struct{}, ) (*GarbageCollector, error) { + graphBuilder := NewDependencyGraphBuilder(ctx, metadataClient, mapper, ignoredResources, sharedInformers, informersStarted) + return NewComposedGarbageCollector(ctx, kubeClient, metadataClient, mapper, graphBuilder) +} - eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) - eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "garbage-collector-controller"}) +func NewComposedGarbageCollector( + ctx context.Context, + kubeClient clientset.Interface, + metadataClient metadata.Interface, + mapper meta.ResettableRESTMapper, + graphBuilder *GraphBuilder, +) (*GarbageCollector, error) { + attemptToDelete, attemptToOrphan, absentOwnerCache := graphBuilder.GetGraphResources() - attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete") - attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan") - absentOwnerCache := NewReferenceCache(500) gc := &GarbageCollector{ - metadataClient: metadataClient, - restMapper: mapper, - attemptToDelete: attemptToDelete, - attemptToOrphan: attemptToOrphan, - absentOwnerCache: absentOwnerCache, - kubeClient: kubeClient, - eventBroadcaster: eventBroadcaster, - } - gc.dependencyGraphBuilder = &GraphBuilder{ - eventRecorder: eventRecorder, - metadataClient: metadataClient, - informersStarted: informersStarted, - restMapper: mapper, - graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"), - uidToNode: &concurrentUIDToNode{ - uidToNode: make(map[types.UID]*node), - }, - attemptToDelete: attemptToDelete, - attemptToOrphan: attemptToOrphan, - absentOwnerCache: absentOwnerCache, - sharedInformers: sharedInformers, - ignoredResources: ignoredResources, + metadataClient: metadataClient, + restMapper: mapper, + attemptToDelete: attemptToDelete, + attemptToOrphan: attemptToOrphan, + absentOwnerCache: absentOwnerCache, + kubeClient: kubeClient, + eventBroadcaster: graphBuilder.eventBroadcaster, + dependencyGraphBuilder: graphBuilder, } metrics.Register() @@ -863,3 +853,8 @@ func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerR func (gc *GarbageCollector) Name() string { return "garbagecollector" } + +// GetDependencyGraphBuilder return graph builder which is particularly helpful for testing where controllerContext is not available +func (gc *GarbageCollector) GetDependencyGraphBuilder() *GraphBuilder { + return gc.dependencyGraphBuilder +} diff --git a/pkg/controller/garbagecollector/graph_builder.go b/pkg/controller/garbagecollector/graph_builder.go index 9648664ed2b..24d8d9e321e 100644 --- a/pkg/controller/garbagecollector/graph_builder.go +++ b/pkg/controller/garbagecollector/graph_builder.go @@ -40,7 +40,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/controller-manager/pkg/informerfactory" - + "k8s.io/kubernetes/pkg/controller/apis/config/scheme" "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" ) @@ -97,7 +97,8 @@ type GraphBuilder struct { // it is protected by monitorLock. running bool - eventRecorder record.EventRecorder + eventRecorder record.EventRecorder + eventBroadcaster record.EventBroadcaster metadataClient metadata.Interface // monitors are the producer of the graphChanges queue, graphBuilder alters @@ -134,6 +135,39 @@ func (m *monitor) Run() { type monitors map[schema.GroupVersionResource]*monitor +func NewDependencyGraphBuilder( + ctx context.Context, + metadataClient metadata.Interface, + mapper meta.ResettableRESTMapper, + ignoredResources map[schema.GroupResource]struct{}, + sharedInformers informerfactory.InformerFactory, + informersStarted <-chan struct{}, +) *GraphBuilder { + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) + + attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete") + attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan") + absentOwnerCache := NewReferenceCache(500) + graphBuilder := &GraphBuilder{ + eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "garbage-collector-controller"}), + eventBroadcaster: eventBroadcaster, + metadataClient: metadataClient, + informersStarted: informersStarted, + restMapper: mapper, + graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"), + uidToNode: &concurrentUIDToNode{ + uidToNode: make(map[types.UID]*node), + }, + attemptToDelete: attemptToDelete, + attemptToOrphan: attemptToOrphan, + absentOwnerCache: absentOwnerCache, + sharedInformers: sharedInformers, + ignoredResources: ignoredResources, + } + + return graphBuilder +} + func (gb *GraphBuilder) controllerFor(logger klog.Logger, resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) { handlers := cache.ResourceEventHandlerFuncs{ // add the event to the dependencyGraphBuilder's graphChanges. @@ -935,3 +969,62 @@ func getAlternateOwnerIdentity(deps []*node, verifiedAbsentIdentity objectRefere // otherwise return the first alternate identity return first } + +func (gb *GraphBuilder) GetGraphResources() ( + attemptToDelete workqueue.RateLimitingInterface, + attemptToOrphan workqueue.RateLimitingInterface, + absentOwnerCache *ReferenceCache, +) { + return gb.attemptToDelete, gb.attemptToOrphan, gb.absentOwnerCache +} + +type Monitor struct { + Store cache.Store + Controller cache.Controller +} + +// GetMonitor returns a monitor for the given resource. +// If the monitor is not synced, it will return an error and the monitor to allow the caller to decide whether to retry. +// If the monitor is not found, it will return only an error. +func (gb *GraphBuilder) GetMonitor(ctx context.Context, resource schema.GroupVersionResource) (*Monitor, error) { + gb.monitorLock.RLock() + defer gb.monitorLock.RUnlock() + + var monitor *monitor + if m, ok := gb.monitors[resource]; ok { + monitor = m + } else { + for monitorGVR, m := range gb.monitors { + if monitorGVR.Group == resource.Group && monitorGVR.Resource == resource.Resource { + monitor = m + break + } + } + } + + if monitor == nil { + return nil, fmt.Errorf("no monitor found for resource %s", resource.String()) + } + + resourceMonitor := &Monitor{ + Store: monitor.store, + Controller: monitor.controller, + } + + if !cache.WaitForNamedCacheSync( + gb.Name(), + ctx.Done(), + func() bool { + return monitor.controller.HasSynced() + }, + ) { + // returning monitor to allow the caller to decide whether to retry as it can be synced later + return resourceMonitor, fmt.Errorf("dependency graph for resource %s is not synced", resource.String()) + } + + return resourceMonitor, nil +} + +func (gb *GraphBuilder) Name() string { + return "dependencygraphbuilder" +} diff --git a/pkg/controller/storageversionmigrator/resourceversion.go b/pkg/controller/storageversionmigrator/resourceversion.go new file mode 100644 index 00000000000..a6ab4c2b5ad --- /dev/null +++ b/pkg/controller/storageversionmigrator/resourceversion.go @@ -0,0 +1,284 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversionmigrator + +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + "k8s.io/client-go/metadata" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/controller" + + svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + svminformers "k8s.io/client-go/informers/storagemigration/v1alpha1" + clientset "k8s.io/client-go/kubernetes" + svmlisters "k8s.io/client-go/listers/storagemigration/v1alpha1" +) + +const ( + // this name is guaranteed to be not present in the cluster as it not a valid namespace name + fakeSVMNamespaceName string = "@fake:svm_ns!" + ResourceVersionControllerName string = "resource-version-controller" +) + +// ResourceVersionController adds the resource version obtained from a randomly nonexistent namespace +// to the SVM status before the migration is initiated. This resource version is utilized for checking +// freshness of GC cache before the migration is initiated. +type ResourceVersionController struct { + discoveryClient *discovery.DiscoveryClient + metadataClient metadata.Interface + svmListers svmlisters.StorageVersionMigrationLister + svmSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + kubeClient clientset.Interface + mapper meta.ResettableRESTMapper +} + +func NewResourceVersionController( + ctx context.Context, + kubeClient clientset.Interface, + discoveryClient *discovery.DiscoveryClient, + metadataClient metadata.Interface, + svmInformer svminformers.StorageVersionMigrationInformer, + mapper meta.ResettableRESTMapper, +) *ResourceVersionController { + logger := klog.FromContext(ctx) + + rvController := &ResourceVersionController{ + kubeClient: kubeClient, + discoveryClient: discoveryClient, + metadataClient: metadataClient, + svmListers: svmInformer.Lister(), + svmSynced: svmInformer.Informer().HasSynced, + mapper: mapper, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ResourceVersionControllerName), + } + + _, _ = svmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + rvController.addSVM(logger, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + rvController.updateSVM(logger, oldObj, newObj) + }, + }) + + return rvController +} + +func (rv *ResourceVersionController) addSVM(logger klog.Logger, obj interface{}) { + svm := obj.(*svmv1alpha1.StorageVersionMigration) + logger.V(4).Info("Adding", "svm", klog.KObj(svm)) + rv.enqueue(svm) +} + +func (rv *ResourceVersionController) updateSVM(logger klog.Logger, oldObj, newObj interface{}) { + oldSVM := oldObj.(*svmv1alpha1.StorageVersionMigration) + newSVM := newObj.(*svmv1alpha1.StorageVersionMigration) + logger.V(4).Info("Updating", "svm", klog.KObj(oldSVM)) + rv.enqueue(newSVM) +} + +func (rv *ResourceVersionController) enqueue(svm *svmv1alpha1.StorageVersionMigration) { + key, err := controller.KeyFunc(svm) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %w", svm, err)) + return + } + + rv.queue.Add(key) +} + +func (rv *ResourceVersionController) Run(ctx context.Context) { + defer utilruntime.HandleCrash() + defer rv.queue.ShutDown() + + logger := klog.FromContext(ctx) + logger.Info("Starting", "controller", ResourceVersionControllerName) + defer logger.Info("Shutting down", "controller", ResourceVersionControllerName) + + if !cache.WaitForNamedCacheSync(ResourceVersionControllerName, ctx.Done(), rv.svmSynced) { + return + } + + go wait.UntilWithContext(ctx, rv.worker, time.Second) + + <-ctx.Done() +} + +func (rv *ResourceVersionController) worker(ctx context.Context) { + for rv.processNext(ctx) { + } +} + +func (rv *ResourceVersionController) processNext(ctx context.Context) bool { + eKey, quit := rv.queue.Get() + if quit { + return false + } + defer rv.queue.Done(eKey) + + key := eKey.(string) + err := rv.sync(ctx, key) + if err == nil { + rv.queue.Forget(key) + return true + } + + klog.FromContext(ctx).V(2).Info("Error syncing SVM resource, retrying", "svm", key, "err", err) + rv.queue.AddRateLimited(key) + + return true +} + +func (rv *ResourceVersionController) sync(ctx context.Context, key string) error { + logger := klog.FromContext(ctx) + startTime := time.Now() + + // SVM is a cluster scoped resource so we don't care about the namespace + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + + svm, err := rv.svmListers.Get(name) + if apierrors.IsNotFound(err) { + // no work to do, don't fail and requeue + return nil + } + if err != nil { + return err + } + // working with copy to avoid race condition between this and migration controller + toBeProcessedSVM := svm.DeepCopy() + gvr := getGVRFromResource(toBeProcessedSVM) + + if IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded) || IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationFailed) { + logger.V(4).Info("Migration has already succeeded or failed previously, skipping", "svm", name) + return nil + } + + if len(toBeProcessedSVM.Status.ResourceVersion) != 0 { + logger.V(4).Info("Resource version is already set", "svm", name) + return nil + } + + exists, err := rv.resourceExists(gvr) + if err != nil { + return err + } + if !exists { + _, err = rv.kubeClient.StoragemigrationV1alpha1(). + StorageVersionMigrations(). + UpdateStatus( + ctx, + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason), + metav1.UpdateOptions{}, + ) + if err != nil { + return err + } + + return nil + } + + toBeProcessedSVM.Status.ResourceVersion, err = rv.getLatestResourceVersion(gvr, ctx) + if err != nil { + return err + } + + _, err = rv.kubeClient.StoragemigrationV1alpha1(). + StorageVersionMigrations(). + UpdateStatus(ctx, toBeProcessedSVM, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("error updating status for %s: %w", toBeProcessedSVM.Name, err) + } + + logger.V(4).Info("Resource version has been successfully added", "svm", key, "elapsed", time.Since(startTime)) + return nil +} + +func (rv *ResourceVersionController) getLatestResourceVersion(gvr schema.GroupVersionResource, ctx context.Context) (string, error) { + isResourceNamespaceScoped, err := rv.isResourceNamespaceScoped(gvr) + if err != nil { + return "", err + } + + var randomList *metav1.PartialObjectMetadataList + if isResourceNamespaceScoped { + // get list resourceVersion from random non-existent namesapce for the given GVR + randomList, err = rv.metadataClient.Resource(gvr). + Namespace(fakeSVMNamespaceName). + List(ctx, metav1.ListOptions{ + Limit: 1, + }) + } else { + randomList, err = rv.metadataClient.Resource(gvr). + List(ctx, metav1.ListOptions{ + Limit: 1, + }) + } + if err != nil { + // error here is very abstract. adding additional context for better debugging + return "", fmt.Errorf("error getting latest resourceVersion for %s: %w", gvr.String(), err) + } + + return randomList.GetResourceVersion(), err +} + +func (rv *ResourceVersionController) resourceExists(gvr schema.GroupVersionResource) (bool, error) { + mapperGVRs, err := rv.mapper.ResourcesFor(gvr) + if err != nil { + return false, err + } + + for _, mapperGVR := range mapperGVRs { + if mapperGVR.Group == gvr.Group && + mapperGVR.Version == gvr.Version && + mapperGVR.Resource == gvr.Resource { + return true, nil + } + } + + return false, nil +} + +func (rv *ResourceVersionController) isResourceNamespaceScoped(gvr schema.GroupVersionResource) (bool, error) { + resourceList, err := rv.discoveryClient.ServerResourcesForGroupVersion(gvr.GroupVersion().String()) + if err != nil { + return false, err + } + + for _, resource := range resourceList.APIResources { + if resource.Name == gvr.Resource { + return resource.Namespaced, nil + } + } + + return false, fmt.Errorf("resource %q not found", gvr.String()) +} diff --git a/pkg/controller/storageversionmigrator/storageversionmigrator.go b/pkg/controller/storageversionmigrator/storageversionmigrator.go new file mode 100644 index 00000000000..2b2d420b028 --- /dev/null +++ b/pkg/controller/storageversionmigrator/storageversionmigrator.go @@ -0,0 +1,318 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversionmigrator + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "k8s.io/klog/v2" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/garbagecollector" + + svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + svminformers "k8s.io/client-go/informers/storagemigration/v1alpha1" + svmlisters "k8s.io/client-go/listers/storagemigration/v1alpha1" +) + +const ( + workers = 5 + migrationSuccessStatusReason = "StorageVersionMigrationSucceeded" + migrationRunningStatusReason = "StorageVersionMigrationInProgress" + migrationFailedStatusReason = "StorageVersionMigrationFailed" +) + +type SVMController struct { + controllerName string + kubeClient kubernetes.Interface + dynamicClient *dynamic.DynamicClient + svmListers svmlisters.StorageVersionMigrationLister + svmSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + restMapper meta.RESTMapper + dependencyGraphBuilder *garbagecollector.GraphBuilder +} + +func NewSVMController( + ctx context.Context, + kubeClient kubernetes.Interface, + dynamicClient *dynamic.DynamicClient, + svmInformer svminformers.StorageVersionMigrationInformer, + controllerName string, + mapper meta.ResettableRESTMapper, + dependencyGraphBuilder *garbagecollector.GraphBuilder, +) *SVMController { + logger := klog.FromContext(ctx) + + svmController := &SVMController{ + kubeClient: kubeClient, + dynamicClient: dynamicClient, + controllerName: controllerName, + svmListers: svmInformer.Lister(), + svmSynced: svmInformer.Informer().HasSynced, + restMapper: mapper, + dependencyGraphBuilder: dependencyGraphBuilder, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName), + } + + _, _ = svmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + svmController.addSVM(logger, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + svmController.updateSVM(logger, oldObj, newObj) + }, + }) + + return svmController +} + +func (svmc *SVMController) Name() string { + return svmc.controllerName +} + +func (svmc *SVMController) addSVM(logger klog.Logger, obj interface{}) { + svm := obj.(*svmv1alpha1.StorageVersionMigration) + logger.V(4).Info("Adding", "svm", klog.KObj(svm)) + svmc.enqueue(svm) +} + +func (svmc *SVMController) updateSVM(logger klog.Logger, oldObj, newObj interface{}) { + oldSVM := oldObj.(*svmv1alpha1.StorageVersionMigration) + newSVM := newObj.(*svmv1alpha1.StorageVersionMigration) + logger.V(4).Info("Updating", "svm", klog.KObj(oldSVM)) + svmc.enqueue(newSVM) +} + +func (svmc *SVMController) enqueue(svm *svmv1alpha1.StorageVersionMigration) { + key, err := controller.KeyFunc(svm) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %w", svm, err)) + return + } + + svmc.queue.Add(key) +} + +func (svmc *SVMController) Run(ctx context.Context) { + defer utilruntime.HandleCrash() + defer svmc.queue.ShutDown() + + logger := klog.FromContext(ctx) + logger.Info("Starting", "controller", svmc.controllerName) + defer logger.Info("Shutting down", "controller", svmc.controllerName) + + if !cache.WaitForNamedCacheSync(svmc.controllerName, ctx.Done(), svmc.svmSynced) { + return + } + + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, svmc.worker, time.Second) + } + + <-ctx.Done() +} + +func (svmc *SVMController) worker(ctx context.Context) { + for svmc.processNext(ctx) { + } +} + +func (svmc *SVMController) processNext(ctx context.Context) bool { + svmKey, quit := svmc.queue.Get() + if quit { + return false + } + defer svmc.queue.Done(svmKey) + + key := svmKey.(string) + err := svmc.sync(ctx, key) + if err == nil { + svmc.queue.Forget(key) + return true + } + + klog.FromContext(ctx).V(2).Info("Error syncing SVM resource, retrying", "svm", key, "err", err) + svmc.queue.AddRateLimited(key) + + return true +} + +func (svmc *SVMController) sync(ctx context.Context, key string) error { + logger := klog.FromContext(ctx) + startTime := time.Now() + + if svmc.dependencyGraphBuilder == nil { + logger.V(4).Info("dependency graph builder is not set. we will skip migration") + return nil + } + + // SVM is a cluster scoped resource so we don't care about the namespace + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + + svm, err := svmc.svmListers.Get(name) + if apierrors.IsNotFound(err) { + // no work to do, don't fail and requeue + return nil + } + if err != nil { + return err + } + // working with a copy to avoid race condition between this and resource version controller + toBeProcessedSVM := svm.DeepCopy() + + if IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded) || IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationFailed) { + logger.V(4).Info("Migration has already succeeded or failed previously, skipping", "svm", name) + return nil + } + + if len(toBeProcessedSVM.Status.ResourceVersion) == 0 { + logger.V(4).Info("The latest resource version is empty. We will attempt to migrate once the resource version is available.") + return nil + } + gvr := getGVRFromResource(toBeProcessedSVM) + + resourceMonitor, err := svmc.dependencyGraphBuilder.GetMonitor(ctx, gvr) + if resourceMonitor != nil { + if err != nil { + // non nil monitor indicates that error is due to resource not being synced + return fmt.Errorf("dependency graph is not synced, requeuing to attempt again") + } + } else { + // we can't migrate a resource that doesn't exist in the GC + _, err = svmc.kubeClient.StoragemigrationV1alpha1(). + StorageVersionMigrations(). + UpdateStatus( + ctx, + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason), + metav1.UpdateOptions{}, + ) + if err != nil { + return err + } + logger.V(4).Error(fmt.Errorf("error migrating the resource"), "resource does not exist in GC", "gvr", gvr.String()) + + return nil + } + + gcListResourceVersion, err := convertResourceVersionToInt(resourceMonitor.Controller.LastSyncResourceVersion()) + if err != nil { + return err + } + listResourceVersion, err := convertResourceVersionToInt(toBeProcessedSVM.Status.ResourceVersion) + if err != nil { + return err + } + + if gcListResourceVersion < listResourceVersion { + return fmt.Errorf("GC cache is not up to date, requeuing to attempt again. gcListResourceVersion: %d, listResourceVersion: %d", gcListResourceVersion, listResourceVersion) + } + + toBeProcessedSVM, err = svmc.kubeClient.StoragemigrationV1alpha1(). + StorageVersionMigrations(). + UpdateStatus( + ctx, + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationRunning, migrationRunningStatusReason), + metav1.UpdateOptions{}, + ) + if err != nil { + return err + } + + gvk, err := svmc.restMapper.KindFor(gvr) + if err != nil { + return err + } + typeMeta := metav1.TypeMeta{} + typeMeta.APIVersion, typeMeta.Kind = gvk.ToAPIVersionAndKind() + data, err := json.Marshal(typeMeta) + if err != nil { + return err + } + + // ToDo: implement a mechanism to resume migration from the last migrated resource in case of a failure + // process storage migration + for _, gvrKey := range resourceMonitor.Store.ListKeys() { + namespace, name, err := cache.SplitMetaNamespaceKey(gvrKey) + if err != nil { + return err + } + + _, err = svmc.dynamicClient.Resource(gvr). + Namespace(namespace). + Patch(ctx, + name, + types.ApplyPatchType, + data, + metav1.PatchOptions{ + FieldManager: svmc.controllerName, + }, + ) + if err != nil { + // in case of NotFound or Conflict, we can stop processing migration for that resource + if apierrors.IsNotFound(err) || apierrors.IsConflict(err) { + continue + } + + _, err = svmc.kubeClient.StoragemigrationV1alpha1(). + StorageVersionMigrations(). + UpdateStatus( + ctx, + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason), + metav1.UpdateOptions{}, + ) + if err != nil { + return err + } + logger.V(4).Error(err, "Failed to migrate the resource", "name", gvrKey, "gvr", gvr.String(), "reason", apierrors.ReasonForError(err)) + + return nil + // Todo: add retry for scenarios where API server returns rate limiting error + } + logger.V(4).Info("Successfully migrated the resource", "name", gvrKey, "gvr", gvr.String()) + } + + _, err = svmc.kubeClient.StoragemigrationV1alpha1(). + StorageVersionMigrations(). + UpdateStatus( + ctx, + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded, migrationSuccessStatusReason), + metav1.UpdateOptions{}, + ) + if err != nil { + return err + } + + logger.V(4).Info("Finished syncing svm resource", "key", key, "gvr", gvr.String(), "elapsed", time.Since(startTime)) + return nil +} diff --git a/pkg/controller/storageversionmigrator/util.go b/pkg/controller/storageversionmigrator/util.go new file mode 100644 index 00000000000..08f6efc0188 --- /dev/null +++ b/pkg/controller/storageversionmigrator/util.go @@ -0,0 +1,84 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversionmigrator + +import ( + "fmt" + "strconv" + + "k8s.io/apimachinery/pkg/runtime/schema" + + corev1 "k8s.io/api/core/v1" + svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func convertResourceVersionToInt(rv string) (int64, error) { + resourceVersion, err := strconv.ParseInt(rv, 10, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse resource version %q: %w", rv, err) + } + + return resourceVersion, nil +} + +func getGVRFromResource(svm *svmv1alpha1.StorageVersionMigration) schema.GroupVersionResource { + return schema.GroupVersionResource{ + Group: svm.Spec.Resource.Group, + Version: svm.Spec.Resource.Version, + Resource: svm.Spec.Resource.Resource, + } +} + +// IsConditionTrue returns true if the StorageVersionMigration has the given condition +// It is exported for use in tests +func IsConditionTrue(svm *svmv1alpha1.StorageVersionMigration, conditionType svmv1alpha1.MigrationConditionType) bool { + return indexOfCondition(svm, conditionType) != -1 +} + +func indexOfCondition(svm *svmv1alpha1.StorageVersionMigration, conditionType svmv1alpha1.MigrationConditionType) int { + for i, c := range svm.Status.Conditions { + if c.Type == conditionType && c.Status == corev1.ConditionTrue { + return i + } + } + return -1 +} + +func setStatusConditions( + toBeUpdatedSVM *svmv1alpha1.StorageVersionMigration, + conditionType svmv1alpha1.MigrationConditionType, + reason string, +) *svmv1alpha1.StorageVersionMigration { + if !IsConditionTrue(toBeUpdatedSVM, conditionType) { + if conditionType == svmv1alpha1.MigrationSucceeded || conditionType == svmv1alpha1.MigrationFailed { + runningConditionIdx := indexOfCondition(toBeUpdatedSVM, svmv1alpha1.MigrationRunning) + if runningConditionIdx != -1 { + toBeUpdatedSVM.Status.Conditions[runningConditionIdx].Status = corev1.ConditionFalse + } + } + + toBeUpdatedSVM.Status.Conditions = append(toBeUpdatedSVM.Status.Conditions, svmv1alpha1.MigrationCondition{ + Type: conditionType, + Status: corev1.ConditionTrue, + LastUpdateTime: metav1.Now(), + Reason: reason, + }) + } + + return toBeUpdatedSVM +} diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index a79373af461..b39ac933193 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -487,6 +487,18 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) }, }) + if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionMigrator) { + addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{ + Name: saRolePrefix + "storage-version-migrator-controller", + }, + Rules: []rbacv1.PolicyRule{ + rbacv1helpers.NewRule("list", "patch").Groups("*").Resources("*").RuleOrDie(), + rbacv1helpers.NewRule("update").Groups(storageVersionMigrationGroup).Resources("storageversionmigrations/status").RuleOrDie(), + }, + }) + } + return controllerRoles, controllerRoleBindings } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index b8056c4e6d0..f91da6e2673 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -42,27 +42,28 @@ var ( ) const ( - legacyGroup = "" - appsGroup = "apps" - authenticationGroup = "authentication.k8s.io" - authorizationGroup = "authorization.k8s.io" - autoscalingGroup = "autoscaling" - batchGroup = "batch" - certificatesGroup = "certificates.k8s.io" - coordinationGroup = "coordination.k8s.io" - discoveryGroup = "discovery.k8s.io" - extensionsGroup = "extensions" - policyGroup = "policy" - rbacGroup = "rbac.authorization.k8s.io" - resourceGroup = "resource.k8s.io" - storageGroup = "storage.k8s.io" - resMetricsGroup = "metrics.k8s.io" - customMetricsGroup = "custom.metrics.k8s.io" - externalMetricsGroup = "external.metrics.k8s.io" - networkingGroup = "networking.k8s.io" - eventsGroup = "events.k8s.io" - internalAPIServerGroup = "internal.apiserver.k8s.io" - admissionRegistrationGroup = "admissionregistration.k8s.io" + legacyGroup = "" + appsGroup = "apps" + authenticationGroup = "authentication.k8s.io" + authorizationGroup = "authorization.k8s.io" + autoscalingGroup = "autoscaling" + batchGroup = "batch" + certificatesGroup = "certificates.k8s.io" + coordinationGroup = "coordination.k8s.io" + discoveryGroup = "discovery.k8s.io" + extensionsGroup = "extensions" + policyGroup = "policy" + rbacGroup = "rbac.authorization.k8s.io" + resourceGroup = "resource.k8s.io" + storageGroup = "storage.k8s.io" + resMetricsGroup = "metrics.k8s.io" + customMetricsGroup = "custom.metrics.k8s.io" + externalMetricsGroup = "external.metrics.k8s.io" + networkingGroup = "networking.k8s.io" + eventsGroup = "events.k8s.io" + internalAPIServerGroup = "internal.apiserver.k8s.io" + admissionRegistrationGroup = "admissionregistration.k8s.io" + storageVersionMigrationGroup = "storagemigration.k8s.io" ) func addDefaultMetadata(obj runtime.Object) { diff --git a/staging/src/k8s.io/client-go/features/known_features.go b/staging/src/k8s.io/client-go/features/known_features.go index 329eed72574..0c972a46fd5 100644 --- a/staging/src/k8s.io/client-go/features/known_features.go +++ b/staging/src/k8s.io/client-go/features/known_features.go @@ -37,6 +37,10 @@ const ( // The feature is disabled in Beta by default because // it will only be turned on for selected control plane component(s). WatchListClient Feature = "WatchListClient" + + // owner: @nilekhc + // alpha: v1.30 + InformerResourceVersion Feature = "InformerResourceVersion" ) // defaultKubernetesFeatureGates consists of all known Kubernetes-specific feature keys. @@ -45,5 +49,6 @@ const ( // After registering with the binary, the features are, by default, controllable using environment variables. // For more details, please see envVarFeatureGates implementation. var defaultKubernetesFeatureGates = map[Feature]FeatureSpec{ - WatchListClient: {Default: false, PreRelease: Beta}, + WatchListClient: {Default: false, PreRelease: Beta}, + InformerResourceVersion: {Default: false, PreRelease: Alpha}, } diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index a06df6e6e78..c805030bd73 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -31,6 +31,8 @@ import ( "k8s.io/utils/clock" "k8s.io/klog/v2" + + clientgofeaturegate "k8s.io/client-go/features" ) // SharedInformer provides eventually consistent linkage of its @@ -409,6 +411,10 @@ func (v *dummyController) HasSynced() bool { } func (v *dummyController) LastSyncResourceVersion() string { + if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) { + return v.informer.LastSyncResourceVersion() + } + return "" } diff --git a/test/integration/storageversionmigrator/main_test.go b/test/integration/storageversionmigrator/main_test.go new file mode 100644 index 00000000000..6062f62e4d7 --- /dev/null +++ b/test/integration/storageversionmigrator/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversionmigrator + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} diff --git a/test/integration/storageversionmigrator/storageversionmigrator_test.go b/test/integration/storageversionmigrator/storageversionmigrator_test.go new file mode 100644 index 00000000000..c7523477bf5 --- /dev/null +++ b/test/integration/storageversionmigrator/storageversionmigrator_test.go @@ -0,0 +1,270 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversionmigrator + +import ( + "bytes" + "context" + "testing" + "time" + + etcd3watcher "k8s.io/apiserver/pkg/storage/etcd3" + "k8s.io/klog/v2/ktesting" + + svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller" + utilfeature "k8s.io/apiserver/pkg/util/feature" + clientgofeaturegate "k8s.io/client-go/features" + "k8s.io/component-base/featuregate" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" +) + +// TestStorageVersionMigration is an integration test that verifies storage version migration works. +// This test asserts following scenarios: +// 1. Start API server with encryption at rest and hot reload of encryption config enabled +// 2. Create a secret +// 3. Update encryption config file to add a new key as write key +// 4. Perform Storage Version Migration for secrets +// 5. Verify that the secret is migrated to use the new key +// 6. Verify that the secret is updated with a new resource version +// 7. Perform another Storage Version Migration for secrets +// 8. Verify that the resource version of the secret is not updated. i.e. it was a no-op update +func TestStorageVersionMigration(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionMigrator, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true)() + + // this makes the test super responsive. It's set to a default of 1 minute. + encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Millisecond + + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + svmTest := svmSetup(ctx, t) + + // ToDo: try to test with 1000 secrets + secret, err := svmTest.createSecret(ctx, t, secretName, defaultNamespace) + if err != nil { + t.Fatalf("Failed to create secret: %v", err) + } + + metricBeforeUpdate := svmTest.getAutomaticReloadSuccessTotal(ctx, t) + svmTest.updateFile(t, svmTest.filePathForEncryptionConfig, encryptionConfigFileName, []byte(resources["updatedEncryptionConfig"])) + if !svmTest.isEncryptionConfigFileUpdated(ctx, t, metricBeforeUpdate) { + t.Fatalf("Failed to update encryption config file") + } + + svm, err := svmTest.createSVMResource( + ctx, + t, + svmName, + svmv1alpha1.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "secrets", + }, + ) + if err != nil { + t.Fatalf("Failed to create SVM resource: %v", err) + } + if !svmTest.waitForResourceMigration(ctx, t, svm.Name, secret.Name, 1) { + t.Fatalf("Failed to migrate resource %s/%s", secret.Namespace, secret.Name) + } + + wantPrefix := "k8s:enc:aescbc:v1:key2" + etcdSecret, err := svmTest.getRawSecretFromETCD(t, secret.Name, secret.Namespace) + if err != nil { + t.Fatalf("Failed to get secret from etcd: %v", err) + } + // assert that secret is prefixed with the new key + if !bytes.HasPrefix(etcdSecret, []byte(wantPrefix)) { + t.Fatalf("expected secret to be prefixed with %s, but got %s", wantPrefix, etcdSecret) + } + + secretAfterMigration, err := svmTest.client.CoreV1().Secrets(secret.Namespace).Get(ctx, secret.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get secret: %v", err) + } + // assert that RV is different + // rv is expected to be different as the secret was re-written to etcd with the new key + if secret.ResourceVersion == secretAfterMigration.ResourceVersion { + t.Fatalf("Expected resource version to be different, but got the same, rv before: %s, rv after: %s", secret.ResourceVersion, secretAfterMigration.ResourceVersion) + } + + secondSVM, err := svmTest.createSVMResource( + ctx, + t, + secondSVMName, + svmv1alpha1.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "secrets", + }, + ) + if err != nil { + t.Fatalf("Failed to create SVM resource: %v", err) + } + if !svmTest.waitForResourceMigration(ctx, t, secondSVM.Name, secretAfterMigration.Name, 2) { + t.Fatalf("Failed to migrate resource %s/%s", secretAfterMigration.Namespace, secretAfterMigration.Name) + } + + secretAfterSecondMigration, err := svmTest.client.CoreV1().Secrets(secretAfterMigration.Namespace).Get(ctx, secretAfterMigration.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get secret: %v", err) + } + // assert that RV is same + if secretAfterMigration.ResourceVersion != secretAfterSecondMigration.ResourceVersion { + t.Fatalf("Expected resource version to be same, but got different, rv before: %s, rv after: %s", secretAfterMigration.ResourceVersion, secretAfterSecondMigration.ResourceVersion) + } +} + +// TestStorageVersionMigrationWithCRD is an integration test that verifies storage version migration works with CRD. +// This test asserts following scenarios: +// 1. CRD is created with version v1 (serving and storage) +// 2. Verify that CRs are written and stored as v1 +// 3. Update CRD to introduce v2 (for serving only), and a conversion webhook is added +// 4. Verify that CRs are written to v2 but are stored as v1 +// 5. CRD storage version is changed from v1 to v2 +// 6. Verify that CR written as either v1 or v2 version are stored as v2 +// 7. Perform Storage Version Migration to migrate all v1 CRs to v2 +// 8. CRD is updated to no longer serve v1 +// 9. Shutdown conversion webhook +// 10. Verify RV and Generations of CRs +// 11. Verify the list of CRs at v2 works +func TestStorageVersionMigrationWithCRD(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionMigrator, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true)() + // decode errors are expected when using conversation webhooks + etcd3watcher.TestOnlySetFatalOnDecodeError(false) + defer etcd3watcher.TestOnlySetFatalOnDecodeError(true) + + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + crVersions := make(map[string]versions) + + svmTest := svmSetup(ctx, t) + certCtx := svmTest.setupServerCert(t) + + // create CRD with v1 serving and storage + crd := svmTest.createCRD(t, crdName, crdGroup, certCtx, v1CRDVersion) + + // create CR + cr1 := svmTest.createCR(ctx, t, "cr1", "v1") + if ok := svmTest.isCRStoredAtVersion(t, "v1", cr1.GetName()); !ok { + t.Fatalf("CR not stored at version v1") + } + crVersions[cr1.GetName()] = versions{ + generation: cr1.GetGeneration(), + rv: cr1.GetResourceVersion(), + isRVUpdated: true, + } + + // add conversion webhook + shutdownServer := svmTest.createConversionWebhook(ctx, t, certCtx) + + // add v2 for serving only + svmTest.updateCRD(ctx, t, crd.Name, v2CRDVersion) + + // create another CR + cr2 := svmTest.createCR(ctx, t, "cr2", "v2") + if ok := svmTest.isCRStoredAtVersion(t, "v1", cr2.GetName()); !ok { + t.Fatalf("CR not stored at version v1") + } + crVersions[cr2.GetName()] = versions{ + generation: cr2.GetGeneration(), + rv: cr2.GetResourceVersion(), + isRVUpdated: true, + } + + // add v2 as storage version + svmTest.updateCRD(ctx, t, crd.Name, v2StorageCRDVersion) + + // create CR with v1 + cr3 := svmTest.createCR(ctx, t, "cr3", "v1") + if ok := svmTest.isCRStoredAtVersion(t, "v2", cr3.GetName()); !ok { + t.Fatalf("CR not stored at version v2") + } + crVersions[cr3.GetName()] = versions{ + generation: cr3.GetGeneration(), + rv: cr3.GetResourceVersion(), + isRVUpdated: false, + } + + // create CR with v2 + cr4 := svmTest.createCR(ctx, t, "cr4", "v2") + if ok := svmTest.isCRStoredAtVersion(t, "v2", cr4.GetName()); !ok { + t.Fatalf("CR not stored at version v2") + } + crVersions[cr4.GetName()] = versions{ + generation: cr4.GetGeneration(), + rv: cr4.GetResourceVersion(), + isRVUpdated: false, + } + + // verify cr1 ans cr2 are still stored at v1 + if ok := svmTest.isCRStoredAtVersion(t, "v1", cr1.GetName()); !ok { + t.Fatalf("CR not stored at version v1") + } + if ok := svmTest.isCRStoredAtVersion(t, "v1", cr2.GetName()); !ok { + t.Fatalf("CR not stored at version v1") + } + + // migrate CRs from v1 to v2 + svm, err := svmTest.createSVMResource( + ctx, t, "crdsvm", + svmv1alpha1.GroupVersionResource{ + Group: crd.Spec.Group, + Version: "v1", + Resource: crd.Spec.Names.Plural, + }) + if err != nil { + t.Fatalf("Failed to create SVM resource: %v", err) + } + if ok := svmTest.isCRDMigrated(ctx, t, svm.Name); !ok { + t.Fatalf("CRD not migrated") + } + + // assert all the CRs are stored in the etcd at correct version + if ok := svmTest.isCRStoredAtVersion(t, "v2", cr1.GetName()); !ok { + t.Fatalf("CR not stored at version v2") + } + if ok := svmTest.isCRStoredAtVersion(t, "v2", cr2.GetName()); !ok { + t.Fatalf("CR not stored at version v2") + } + if ok := svmTest.isCRStoredAtVersion(t, "v2", cr3.GetName()); !ok { + t.Fatalf("CR not stored at version v2") + } + if ok := svmTest.isCRStoredAtVersion(t, "v2", cr4.GetName()); !ok { + t.Fatalf("CR not stored at version v2") + } + + // update CRD to v1 not serving and storage followed by webhook shutdown + svmTest.updateCRD(ctx, t, crd.Name, v1NotServingCRDVersion) + shutdownServer() + + // assert RV and Generations of CRs + svmTest.validateRVAndGeneration(ctx, t, crVersions) + + // assert v2 CRs can be listed + if err := svmTest.listCR(ctx, t, "v2"); err != nil { + t.Fatalf("Failed to list CRs at version v2: %v", err) + } +} diff --git a/test/integration/storageversionmigrator/util.go b/test/integration/storageversionmigrator/util.go new file mode 100644 index 00000000000..93f86eea117 --- /dev/null +++ b/test/integration/storageversionmigrator/util.go @@ -0,0 +1,1060 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversionmigrator + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "fmt" + "net" + "net/http" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" + "testing" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + corev1 "k8s.io/api/core/v1" + svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + crdintegration "k8s.io/apiextensions-apiserver/test/integration" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" + "k8s.io/apiserver/pkg/storage/storagebackend" + "k8s.io/client-go/discovery" + cacheddiscovery "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatainformer" + "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/util/cert" + "k8s.io/client-go/util/keyutil" + utiltesting "k8s.io/client-go/util/testing" + "k8s.io/controller-manager/pkg/informerfactory" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/cmd/kube-controller-manager/names" + "k8s.io/kubernetes/pkg/controller/garbagecollector" + "k8s.io/kubernetes/pkg/controller/storageversionmigrator" + "k8s.io/kubernetes/test/images/agnhost/crd-conversion-webhook/converter" + "k8s.io/kubernetes/test/integration" + "k8s.io/kubernetes/test/integration/etcd" + "k8s.io/kubernetes/test/integration/framework" + "k8s.io/kubernetes/test/utils" + utilnet "k8s.io/utils/net" + "k8s.io/utils/ptr" +) + +const ( + secretKey = "api_key" + secretVal = "086a7ffc-0225-11e8-ba89-0ed5f89f718b" // Fake value for testing. + secretName = "test-secret" + triggerSecretName = "trigger-for-svm" + svmName = "test-svm" + secondSVMName = "second-test-svm" + auditPolicyFileName = "audit-policy.yaml" + auditLogFileName = "audit.log" + encryptionConfigFileName = "encryption.conf" + metricPrefix = "apiserver_encryption_config_controller_automatic_reload_success_total" + defaultNamespace = "default" + crdName = "testcrd" + crdGroup = "stable.example.com" + servicePort = int32(9443) + webhookHandler = "crdconvert" +) + +var ( + resources = map[string]string{ + "auditPolicy": ` +apiVersion: audit.k8s.io/v1 +kind: Policy +omitStages: + - "RequestReceived" +rules: + - level: Metadata + resources: + - group: "" + resources: ["secrets"] + verbs: ["patch"] +`, + "initialEncryptionConfig": ` +kind: EncryptionConfiguration +apiVersion: apiserver.config.k8s.io/v1 +resources: + - resources: + - secrets + providers: + - aescbc: + keys: + - name: key1 + secret: c2VjcmV0IGlzIHNlY3VyZQ== +`, + "updatedEncryptionConfig": ` +kind: EncryptionConfiguration +apiVersion: apiserver.config.k8s.io/v1 +resources: + - resources: + - secrets + providers: + - aescbc: + keys: + - name: key2 + secret: c2VjcmV0IGlzIHNlY3VyZSwgaXMgaXQ/ + - aescbc: + keys: + - name: key1 + secret: c2VjcmV0IGlzIHNlY3VyZQ== +`, + } + + v1CRDVersion = []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Served: true, + Storage: true, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "hostPort": {Type: "string"}, + }, + }, + }, + }, + } + v2CRDVersion = []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: "v2", + Served: true, + Storage: false, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "host": {Type: "string"}, + "port": {Type: "string"}, + }, + }, + }, + }, + { + Name: "v1", + Served: true, + Storage: true, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "hostPort": {Type: "string"}, + }, + }, + }, + }, + } + v2StorageCRDVersion = []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Served: true, + Storage: false, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "hostPort": {Type: "string"}, + }, + }, + }, + }, + { + Name: "v2", + Served: true, + Storage: true, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "host": {Type: "string"}, + "port": {Type: "string"}, + }, + }, + }, + }, + } + v1NotServingCRDVersion = []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Served: false, + Storage: false, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "hostPort": {Type: "string"}, + }, + }, + }, + }, + { + Name: "v2", + Served: true, + Storage: true, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "host": {Type: "string"}, + "port": {Type: "string"}, + }, + }, + }, + }, + } +) + +type svmTest struct { + policyFile *os.File + logFile *os.File + client clientset.Interface + clientConfig *rest.Config + dynamicClient *dynamic.DynamicClient + storageConfig *storagebackend.Config + server *kubeapiservertesting.TestServer + apiextensionsclient *apiextensionsclientset.Clientset + filePathForEncryptionConfig string +} + +func svmSetup(ctx context.Context, t *testing.T) *svmTest { + t.Helper() + + filePathForEncryptionConfig, err := createEncryptionConfig(t, resources["initialEncryptionConfig"]) + if err != nil { + t.Fatalf("failed to create encryption config: %v", err) + } + + policyFile, logFile := setupAudit(t) + apiServerFlags := []string{ + "--encryption-provider-config", filepath.Join(filePathForEncryptionConfig, encryptionConfigFileName), + "--encryption-provider-config-automatic-reload=true", + "--disable-admission-plugins", "ServiceAccount", + "--audit-policy-file", policyFile.Name(), + "--audit-log-version", "audit.k8s.io/v1", + "--audit-log-mode", "blocking", + "--audit-log-path", logFile.Name(), + } + storageConfig := framework.SharedEtcd() + server := kubeapiservertesting.StartTestServerOrDie(t, nil, apiServerFlags, storageConfig) + + clientSet, err := clientset.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("error in create clientset: %v", err) + } + + discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery()) + rvDiscoveryClient, err := discovery.NewDiscoveryClientForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("failed to create discovery client: %v", err) + } + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + restMapper.Reset() + metadataClient, err := metadata.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("failed to create metadataClient: %v", err) + } + dynamicClient, err := dynamic.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("error in create dynamic client: %v", err) + } + sharedInformers := informers.NewSharedInformerFactory(clientSet, 0) + metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0) + alwaysStarted := make(chan struct{}) + close(alwaysStarted) + + gc, err := garbagecollector.NewGarbageCollector( + ctx, + clientSet, + metadataClient, + restMapper, + garbagecollector.DefaultIgnoredResources(), + informerfactory.NewInformerFactory(sharedInformers, metadataInformers), + alwaysStarted, + ) + if err != nil { + t.Fatalf("error while creating garbage collector: %v", err) + + } + startGC := func() { + syncPeriod := 5 * time.Second + go wait.Until(func() { + restMapper.Reset() + }, syncPeriod, ctx.Done()) + go gc.Run(ctx, 1) + go gc.Sync(ctx, clientSet.Discovery(), syncPeriod) + } + + svmController := storageversionmigrator.NewSVMController( + ctx, + clientSet, + dynamicClient, + sharedInformers.Storagemigration().V1alpha1().StorageVersionMigrations(), + names.StorageVersionMigratorController, + restMapper, + gc.GetDependencyGraphBuilder(), + ) + + rvController := storageversionmigrator.NewResourceVersionController( + ctx, + clientSet, + rvDiscoveryClient, + metadataClient, + sharedInformers.Storagemigration().V1alpha1().StorageVersionMigrations(), + restMapper, + ) + + // Start informer and controllers + sharedInformers.Start(ctx.Done()) + startGC() + go svmController.Run(ctx) + go rvController.Run(ctx) + + svmTest := &svmTest{ + storageConfig: storageConfig, + server: server, + client: clientSet, + clientConfig: server.ClientConfig, + dynamicClient: dynamicClient, + policyFile: policyFile, + logFile: logFile, + filePathForEncryptionConfig: filePathForEncryptionConfig, + } + + t.Cleanup(func() { + server.TearDownFn() + utiltesting.CloseAndRemove(t, svmTest.logFile) + utiltesting.CloseAndRemove(t, svmTest.policyFile) + err = os.RemoveAll(svmTest.filePathForEncryptionConfig) + if err != nil { + t.Errorf("error while removing temp directory: %v", err) + } + }) + + return svmTest +} + +func createEncryptionConfig(t *testing.T, encryptionConfig string) ( + filePathForEncryptionConfig string, + err error, +) { + t.Helper() + tempDir, err := os.MkdirTemp("", svmName) + if err != nil { + return "", fmt.Errorf("failed to create temp directory: %w", err) + } + + if err = os.WriteFile(filepath.Join(tempDir, encryptionConfigFileName), []byte(encryptionConfig), 0644); err != nil { + err = os.RemoveAll(tempDir) + if err != nil { + t.Errorf("error while removing temp directory: %v", err) + } + return tempDir, fmt.Errorf("error while writing encryption config: %w", err) + } + + return tempDir, nil +} + +func (svm *svmTest) createSecret(ctx context.Context, t *testing.T, name, namespace string) (*corev1.Secret, error) { + t.Helper() + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Data: map[string][]byte{ + secretKey: []byte(secretVal), + }, + } + + return svm.client.CoreV1().Secrets(secret.Namespace).Create(ctx, secret, metav1.CreateOptions{}) +} + +func (svm *svmTest) getRawSecretFromETCD(t *testing.T, name, namespace string) ([]byte, error) { + t.Helper() + secretETCDPath := svm.getETCDPathForResource(t, svm.storageConfig.Prefix, "", "secrets", name, namespace) + etcdResponse, err := svm.readRawRecordFromETCD(t, secretETCDPath) + if err != nil { + return nil, fmt.Errorf("failed to read %s from etcd: %w", secretETCDPath, err) + } + return etcdResponse.Kvs[0].Value, nil +} + +func (svm *svmTest) getETCDPathForResource(t *testing.T, storagePrefix, group, resource, name, namespaceName string) string { + t.Helper() + groupResource := resource + if group != "" { + groupResource = fmt.Sprintf("%s/%s", group, resource) + } + if namespaceName == "" { + return fmt.Sprintf("/%s/%s/%s", storagePrefix, groupResource, name) + } + return fmt.Sprintf("/%s/%s/%s/%s", storagePrefix, groupResource, namespaceName, name) +} + +func (svm *svmTest) readRawRecordFromETCD(t *testing.T, path string) (*clientv3.GetResponse, error) { + t.Helper() + rawClient, etcdClient, err := integration.GetEtcdClients(svm.server.ServerOpts.Etcd.StorageConfig.Transport) + if err != nil { + return nil, fmt.Errorf("failed to create etcd client: %w", err) + } + // kvClient is a wrapper around rawClient and to avoid leaking goroutines we need to + // close the client (which we can do by closing rawClient). + defer func() { + if err := rawClient.Close(); err != nil { + t.Errorf("error closing rawClient: %v", err) + } + }() + + response, err := etcdClient.Get(context.Background(), path, clientv3.WithPrefix()) + if err != nil { + return nil, fmt.Errorf("failed to retrieve secret from etcd %w", err) + } + + return response, nil +} + +func (svm *svmTest) getRawCRFromETCD(t *testing.T, name, namespace, crdGroup, crdName string) ([]byte, error) { + t.Helper() + crdETCDPath := svm.getETCDPathForResource(t, svm.storageConfig.Prefix, crdGroup, crdName, name, namespace) + etcdResponse, err := svm.readRawRecordFromETCD(t, crdETCDPath) + if err != nil { + t.Fatalf("failed to read %s from etcd: %v", crdETCDPath, err) + } + return etcdResponse.Kvs[0].Value, nil +} + +func (svm *svmTest) updateFile(t *testing.T, configDir, filename string, newContent []byte) { + t.Helper() + // Create a temporary file + tempFile, err := os.CreateTemp(configDir, "tempfile") + if err != nil { + t.Fatal(err) + } + defer func() { + if err := tempFile.Close(); err != nil { + t.Errorf("error closing tempFile: %v", err) + } + }() + + // Write the new content to the temporary file + _, err = tempFile.Write(newContent) + if err != nil { + t.Fatal(err) + } + + // Atomically replace the original file with the temporary file + err = os.Rename(tempFile.Name(), filepath.Join(configDir, filename)) + if err != nil { + t.Fatal(err) + } +} + +// func (svm *svmTest) createSVMResource(ctx context.Context, t *testing.T, name string) ( +// *svmv1alpha1.StorageVersionMigration, +// error, +// ) { +// t.Helper() +// svmResource := &svmv1alpha1.StorageVersionMigration{ +// ObjectMeta: metav1.ObjectMeta{ +// Name: name, +// }, +// Spec: svmv1alpha1.StorageVersionMigrationSpec{ +// Resource: svmv1alpha1.GroupVersionResource{ +// Group: "", +// Version: "v1", +// Resource: "secrets", +// }, +// }, +// } +// +// return svm.client.StoragemigrationV1alpha1(). +// StorageVersionMigrations(). +// Create(ctx, svmResource, metav1.CreateOptions{}) +// } + +func (svm *svmTest) createSVMResource(ctx context.Context, t *testing.T, name string, gvr svmv1alpha1.GroupVersionResource) ( + *svmv1alpha1.StorageVersionMigration, + error, +) { + t.Helper() + svmResource := &svmv1alpha1.StorageVersionMigration{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: svmv1alpha1.StorageVersionMigrationSpec{ + Resource: svmv1alpha1.GroupVersionResource{ + Group: gvr.Group, + Version: gvr.Version, + Resource: gvr.Resource, + }, + }, + } + + return svm.client.StoragemigrationV1alpha1(). + StorageVersionMigrations(). + Create(ctx, svmResource, metav1.CreateOptions{}) +} + +func (svm *svmTest) getSVM(ctx context.Context, t *testing.T, name string) ( + *svmv1alpha1.StorageVersionMigration, + error, +) { + t.Helper() + return svm.client.StoragemigrationV1alpha1(). + StorageVersionMigrations(). + Get(ctx, name, metav1.GetOptions{}) +} + +func setupAudit(t *testing.T) ( + policyFile *os.File, + logFile *os.File, +) { + t.Helper() + // prepare audit policy file + policyFile, err := os.CreateTemp("", auditPolicyFileName) + if err != nil { + t.Fatalf("Failed to create audit policy file: %v", err) + } + if _, err := policyFile.Write([]byte(resources["auditPolicy"])); err != nil { + t.Fatalf("Failed to write audit policy file: %v", err) + } + + // prepare audit log file + logFile, err = os.CreateTemp("", auditLogFileName) + if err != nil { + t.Fatalf("Failed to create audit log file: %v", err) + } + + return policyFile, logFile +} + +func (svm *svmTest) getAutomaticReloadSuccessTotal(ctx context.Context, t *testing.T) int { + t.Helper() + + copyConfig := rest.CopyConfig(svm.server.ClientConfig) + copyConfig.GroupVersion = &schema.GroupVersion{} + copyConfig.NegotiatedSerializer = unstructuredscheme.NewUnstructuredNegotiatedSerializer() + rc, err := rest.RESTClientFor(copyConfig) + if err != nil { + t.Fatalf("Failed to create REST client: %v", err) + } + + body, err := rc.Get().AbsPath("/metrics").DoRaw(ctx) + if err != nil { + t.Fatal(err) + } + + metricRegex := regexp.MustCompile(fmt.Sprintf(`%s{.*} (\d+)`, metricPrefix)) + for _, line := range strings.Split(string(body), "\n") { + if strings.HasPrefix(line, metricPrefix) { + matches := metricRegex.FindStringSubmatch(line) + if len(matches) == 2 { + metricValue, err := strconv.Atoi(matches[1]) + if err != nil { + t.Fatalf("Failed to convert metric value to integer: %v", err) + } + return metricValue + } + } + } + + return 0 +} + +func (svm *svmTest) isEncryptionConfigFileUpdated(ctx context.Context, t *testing.T, metricBeforeUpdate int) bool { + t.Helper() + + err := wait.PollUntilContextTimeout( + ctx, + 500*time.Millisecond, + wait.ForeverTestTimeout, + true, + func(ctx context.Context) (bool, error) { + metric := svm.getAutomaticReloadSuccessTotal(ctx, t) + return metric == (metricBeforeUpdate + 1), nil + }, + ) + + return err == nil +} + +// waitForResourceMigration checks following conditions: +// 1. The svm resource has SuccessfullyMigrated condition. +// 2. The audit log contains patch events for the given secret. +func (svm *svmTest) waitForResourceMigration( + ctx context.Context, + t *testing.T, + svmName, name string, + expectedEvents int, +) bool { + t.Helper() + + var isMigrated bool + err := wait.PollUntilContextTimeout( + ctx, + 500*time.Millisecond, + wait.ForeverTestTimeout, + true, + func(ctx context.Context) (bool, error) { + svmResource, err := svm.getSVM(ctx, t, svmName) + if err != nil { + t.Fatalf("Failed to get SVM resource: %v", err) + } + if svmResource.Status.ResourceVersion == "" { + return false, nil + } + + if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationSucceeded) { + isMigrated = true + } + + // We utilize the LastSyncResourceVersion of the Garbage Collector (GC) to ensure that the cache is up-to-date before proceeding with the migration. + // However, in a quiet cluster, the GC may not be updated unless there is some activity or the watch receives a bookmark event after every 10 minutes. + // To expedite the update of the GC cache, we create a dummy secret and then promptly delete it. + // This action forces the GC to refresh its cache, enabling us to proceed with the migration. + _, err = svm.createSecret(ctx, t, triggerSecretName, defaultNamespace) + if err != nil { + t.Fatalf("Failed to create secret: %v", err) + } + err = svm.client.CoreV1().Secrets(defaultNamespace).Delete(ctx, triggerSecretName, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Failed to delete secret: %v", err) + } + + stream, err := os.Open(svm.logFile.Name()) + if err != nil { + t.Fatalf("Failed to open audit log file: %v", err) + } + defer func() { + if err := stream.Close(); err != nil { + t.Errorf("error while closing audit log file: %v", err) + } + }() + + missingReport, err := utils.CheckAuditLines( + stream, + []utils.AuditEvent{ + { + Level: auditinternal.LevelMetadata, + Stage: auditinternal.StageResponseComplete, + RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s?fieldManager=storage-version-migrator-controller", defaultNamespace, name), + Verb: "patch", + Code: 200, + User: "system:apiserver", + Resource: "secrets", + Namespace: "default", + AuthorizeDecision: "allow", + RequestObject: false, + ResponseObject: false, + }, + }, + auditv1.SchemeGroupVersion, + ) + if err != nil { + t.Fatalf("Failed to check audit log: %v", err) + } + if (len(missingReport.MissingEvents) != 0) && (expectedEvents < missingReport.NumEventsChecked) { + isMigrated = false + } + + return isMigrated, nil + }, + ) + if err != nil { + return false + } + + return isMigrated +} + +func (svm *svmTest) createCRD( + t *testing.T, + name, group string, + certCtx *certContext, + crdVersions []apiextensionsv1.CustomResourceDefinitionVersion, +) *apiextensionsv1.CustomResourceDefinition { + t.Helper() + pluralName := name + "s" + listKind := name + "List" + + crd := &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: pluralName + "." + group, + }, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: group, + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Kind: name, + ListKind: listKind, + Plural: pluralName, + Singular: name, + }, + Scope: apiextensionsv1.NamespaceScoped, + Versions: crdVersions, + Conversion: &apiextensionsv1.CustomResourceConversion{ + Strategy: apiextensionsv1.WebhookConverter, + Webhook: &apiextensionsv1.WebhookConversion{ + ClientConfig: &apiextensionsv1.WebhookClientConfig{ + CABundle: certCtx.signingCert, + URL: ptr.To( + fmt.Sprintf("https://127.0.0.1:%d/%s", servicePort, webhookHandler), + ), + }, + ConversionReviewVersions: []string{"v1", "v2"}, + }, + }, + PreserveUnknownFields: false, + }, + } + + apiextensionsclient, err := apiextensionsclientset.NewForConfig(svm.clientConfig) + if err != nil { + t.Fatalf("Failed to create apiextensions client: %v", err) + } + svm.apiextensionsclient = apiextensionsclient + + etcd.CreateTestCRDs(t, apiextensionsclient, false, crd) + return crd +} + +func (svm *svmTest) updateCRD( + ctx context.Context, + t *testing.T, + crdName string, + updatesCRDVersions []apiextensionsv1.CustomResourceDefinitionVersion, +) *apiextensionsv1.CustomResourceDefinition { + t.Helper() + + var err error + _, err = crdintegration.UpdateV1CustomResourceDefinitionWithRetry(svm.apiextensionsclient, crdName, func(c *apiextensionsv1.CustomResourceDefinition) { + c.Spec.Versions = updatesCRDVersions + }) + if err != nil { + t.Fatalf("Failed to update CRD: %v", err) + } + + crd, err := svm.apiextensionsclient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, crdName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get CRD: %v", err) + } + + return crd +} + +func (svm *svmTest) createCR(ctx context.Context, t *testing.T, crName, version string) *unstructured.Unstructured { + t.Helper() + + crdResource := schema.GroupVersionResource{ + Group: crdGroup, + Version: version, + Resource: crdName + "s", + } + + crdUnstructured := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": crdResource.GroupVersion().String(), + "kind": crdName, + "metadata": map[string]interface{}{ + "name": crName, + "namespace": defaultNamespace, + }, + }, + } + + crdUnstructured, err := svm.dynamicClient.Resource(crdResource).Namespace(defaultNamespace).Create(ctx, crdUnstructured, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create CR: %v", err) + } + + return crdUnstructured +} + +func (svm *svmTest) getCR(ctx context.Context, t *testing.T, crName, version string) *unstructured.Unstructured { + t.Helper() + + crdResource := schema.GroupVersionResource{ + Group: crdGroup, + Version: version, + Resource: crdName + "s", + } + + cr, err := svm.dynamicClient.Resource(crdResource).Namespace(defaultNamespace).Get(ctx, crName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get CR: %v", err) + } + + return cr +} + +func (svm *svmTest) listCR(ctx context.Context, t *testing.T, version string) error { + t.Helper() + + crdResource := schema.GroupVersionResource{ + Group: crdGroup, + Version: version, + Resource: crdName + "s", + } + + _, err := svm.dynamicClient.Resource(crdResource).Namespace(defaultNamespace).List(ctx, metav1.ListOptions{}) + + return err +} + +func (svm *svmTest) deleteCR(ctx context.Context, t *testing.T, name, version string) { + t.Helper() + crdResource := schema.GroupVersionResource{ + Group: crdGroup, + Version: version, + Resource: crdName + "s", + } + err := svm.dynamicClient.Resource(crdResource).Namespace(defaultNamespace).Delete(ctx, name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Failed to delete CR: %v", err) + } +} + +func (svm *svmTest) createConversionWebhook(ctx context.Context, t *testing.T, certCtx *certContext) context.CancelFunc { + t.Helper() + http.HandleFunc(fmt.Sprintf("/%s", webhookHandler), converter.ServeExampleConvert) + + block, _ := pem.Decode(certCtx.key) + if block == nil { + panic("failed to parse PEM block containing the key") + } + key, err := x509.ParsePKCS1PrivateKey(block.Bytes) + if err != nil { + t.Fatalf("Failed to parse private key: %v", err) + } + + blockCer, _ := pem.Decode(certCtx.cert) + if blockCer == nil { + panic("failed to parse PEM block containing the key") + } + webhookCert, err := x509.ParseCertificate(blockCer.Bytes) + if err != nil { + t.Fatalf("Failed to parse certificate: %v", err) + } + + server := &http.Server{ + Addr: fmt.Sprintf("127.0.0.1:%d", servicePort), + TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{ + { + Certificate: [][]byte{webhookCert.Raw}, + PrivateKey: key, + }, + }, + }, + } + + go func() { + // skipping error handling here because this always returns a non-nil error. + // after Server.Shutdown, the returned error is ErrServerClosed. + _ = server.ListenAndServeTLS("", "") + + }() + + serverCtx, cancel := context.WithCancel(ctx) + go func(ctx context.Context, t *testing.T) { + <-ctx.Done() + // Context was cancelled, shutdown the server + if err := server.Shutdown(context.Background()); err != nil { + t.Logf("Failed to shutdown server: %v", err) + } + }(serverCtx, t) + + return cancel +} + +type certContext struct { + cert []byte + key []byte + signingCert []byte +} + +func (svm *svmTest) setupServerCert(t *testing.T) *certContext { + t.Helper() + certDir, err := os.MkdirTemp("", "test-e2e-server-cert") + if err != nil { + t.Fatalf("Failed to create a temp dir for cert generation %v", err) + } + defer func(path string) { + err := os.RemoveAll(path) + if err != nil { + t.Fatalf("Failed to remove temp dir %v", err) + } + }(certDir) + signingKey, err := utils.NewPrivateKey() + if err != nil { + t.Fatalf("Failed to create CA private key %v", err) + } + signingCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "e2e-server-cert-ca"}, signingKey) + if err != nil { + t.Fatalf("Failed to create CA cert for apiserver %v", err) + } + caCertFile, err := os.CreateTemp(certDir, "ca.crt") + if err != nil { + t.Fatalf("Failed to create a temp file for ca cert generation %v", err) + } + defer utiltesting.CloseAndRemove(&testing.T{}, caCertFile) + if err := os.WriteFile(caCertFile.Name(), utils.EncodeCertPEM(signingCert), 0644); err != nil { + t.Fatalf("Failed to write CA cert %v", err) + } + key, err := utils.NewPrivateKey() + if err != nil { + t.Fatalf("Failed to create private key for %v", err) + } + signedCert, err := utils.NewSignedCert( + &cert.Config{ + CommonName: "127.0.0.1", + AltNames: cert.AltNames{ + IPs: []net.IP{utilnet.ParseIPSloppy("127.0.0.1")}, + }, + Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + }, + key, signingCert, signingKey, + ) + if err != nil { + t.Fatalf("Failed to create cert%v", err) + } + certFile, err := os.CreateTemp(certDir, "server.crt") + if err != nil { + t.Fatalf("Failed to create a temp file for cert generation %v", err) + } + defer utiltesting.CloseAndRemove(&testing.T{}, certFile) + keyFile, err := os.CreateTemp(certDir, "server.key") + if err != nil { + t.Fatalf("Failed to create a temp file for key generation %v", err) + } + if err = os.WriteFile(certFile.Name(), utils.EncodeCertPEM(signedCert), 0600); err != nil { + t.Fatalf("Failed to write cert file %v", err) + } + privateKeyPEM, err := keyutil.MarshalPrivateKeyToPEM(key) + if err != nil { + t.Fatalf("Failed to marshal key %v", err) + } + if err = os.WriteFile(keyFile.Name(), privateKeyPEM, 0644); err != nil { + t.Fatalf("Failed to write key file %v", err) + } + defer utiltesting.CloseAndRemove(&testing.T{}, keyFile) + return &certContext{ + cert: utils.EncodeCertPEM(signedCert), + key: privateKeyPEM, + signingCert: utils.EncodeCertPEM(signingCert), + } +} + +func (svm *svmTest) isCRStoredAtVersion(t *testing.T, version, crName string) bool { + t.Helper() + + data, err := svm.getRawCRFromETCD(t, crName, defaultNamespace, crdGroup, crdName+"s") + if err != nil { + t.Fatalf("Failed to get CR from etcd: %v", err) + } + + // parse data to unstructured.Unstructured + obj := &unstructured.Unstructured{} + err = obj.UnmarshalJSON(data) + if err != nil { + t.Fatalf("Failed to unmarshal data to unstructured: %v", err) + } + + return obj.GetAPIVersion() == fmt.Sprintf("%s/%s", crdGroup, version) +} + +func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName string) bool { + t.Helper() + + err := wait.PollUntilContextTimeout( + ctx, + 500*time.Millisecond, + wait.ForeverTestTimeout, + true, + func(ctx context.Context) (bool, error) { + triggerCR := svm.createCR(ctx, t, "triggercr", "v1") + svm.deleteCR(ctx, t, triggerCR.GetName(), "v1") + svmResource, err := svm.getSVM(ctx, t, crdSVMName) + if err != nil { + t.Fatalf("Failed to get SVM resource: %v", err) + } + if svmResource.Status.ResourceVersion == "" { + return false, nil + } + + if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationSucceeded) { + return true, nil + } + + return false, nil + }, + ) + return err == nil +} + +type versions struct { + generation int64 + rv string + isRVUpdated bool +} + +func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, crVersions map[string]versions) { + t.Helper() + + for crName, version := range crVersions { + // get CR from etcd + data, err := svm.getRawCRFromETCD(t, crName, defaultNamespace, crdGroup, crdName+"s") + if err != nil { + t.Fatalf("Failed to get CR from etcd: %v", err) + } + + // parse data to unstructured.Unstructured + obj := &unstructured.Unstructured{} + err = obj.UnmarshalJSON(data) + if err != nil { + t.Fatalf("Failed to unmarshal data to unstructured: %v", err) + } + + // validate resourceVersion and generation + crVersion := svm.getCR(ctx, t, crName, "v2").GetResourceVersion() + if version.isRVUpdated && crVersion == version.rv { + t.Fatalf("ResourceVersion of CR %s should not be equal. Expected: %s, Got: %s", crName, version.rv, crVersion) + } + if obj.GetGeneration() != version.generation { + t.Fatalf("Generation of CR %s should be equal. Expected: %d, Got: %d", crName, version.generation, obj.GetGeneration()) + } + } +}