diff --git a/cmd/kube-controller-manager/app/storageversionmigrator.go b/cmd/kube-controller-manager/app/storageversionmigrator.go index d1cb4f2160c..99cf02e0953 100644 --- a/cmd/kube-controller-manager/app/storageversionmigrator.go +++ b/cmd/kube-controller-manager/app/storageversionmigrator.go @@ -54,7 +54,11 @@ func startSVMController( return nil, true, fmt.Errorf("storage version migrator requires garbage collector") } + // svm controller can make a lot of requests during migration, keep it fast config := controllerContext.ClientBuilder.ConfigOrDie(controllerName) + config.QPS *= 20 + config.Burst *= 100 + client := controllerContext.ClientBuilder.ClientOrDie(controllerName) informer := controllerContext.InformerFactory.Storagemigration().V1alpha1().StorageVersionMigrations() diff --git a/pkg/controller/storageversionmigrator/resourceversion.go b/pkg/controller/storageversionmigrator/resourceversion.go index 169615bfdc0..7ce40ba4113 100644 --- a/pkg/controller/storageversionmigrator/resourceversion.go +++ b/pkg/controller/storageversionmigrator/resourceversion.go @@ -199,7 +199,7 @@ func (rv *ResourceVersionController) sync(ctx context.Context, key string) error StorageVersionMigrations(). UpdateStatus( ctx, - setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason), + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "resource does not exist in discovery"), metav1.UpdateOptions{}, ) if err != nil { diff --git a/pkg/controller/storageversionmigrator/storageversionmigrator.go b/pkg/controller/storageversionmigrator/storageversionmigrator.go index a1284f75b21..e90653751d7 100644 --- a/pkg/controller/storageversionmigrator/storageversionmigrator.go +++ b/pkg/controller/storageversionmigrator/storageversionmigrator.go @@ -204,27 +204,35 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error { } gvr := getGVRFromResource(toBeProcessedSVM) - resourceMonitor, err := svmc.dependencyGraphBuilder.GetMonitor(ctx, gvr) + // prevent unsynced monitor from blocking forever + // use a short timeout so that we can fail quickly and possibly handle other migrations while this monitor gets ready. + monCtx, monCtxCancel := context.WithTimeout(ctx, 10*time.Second) + defer monCtxCancel() + resourceMonitor, errMonitor := svmc.dependencyGraphBuilder.GetMonitor(monCtx, gvr) if resourceMonitor != nil { - if err != nil { + if errMonitor != nil { // non nil monitor indicates that error is due to resource not being synced - return fmt.Errorf("dependency graph is not synced, requeuing to attempt again") + return fmt.Errorf("dependency graph is not synced, requeuing to attempt again: %w", errMonitor) } } else { + logger.V(4).Error(errMonitor, "resource does not exist in GC", "gvr", gvr.String()) + + // our GC cache could be missing a recently created custom resource, so give it some time to catch up + // we resync discovery every 30 seconds so twice that should be sufficient + if toBeProcessedSVM.CreationTimestamp.Add(time.Minute).After(time.Now()) { + return fmt.Errorf("resource does not exist in GC, requeuing to attempt again: %w", errMonitor) + } + // we can't migrate a resource that doesn't exist in the GC - _, err = svmc.kubeClient.StoragemigrationV1alpha1(). + _, errStatus := svmc.kubeClient.StoragemigrationV1alpha1(). StorageVersionMigrations(). UpdateStatus( ctx, - setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason), + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "resource not found"), metav1.UpdateOptions{}, ) - if err != nil { - return err - } - logger.V(4).Error(fmt.Errorf("error migrating the resource"), "resource does not exist in GC", "gvr", gvr.String()) - return nil + return errStatus } gcListResourceVersion, err := convertResourceVersionToInt(resourceMonitor.Controller.LastSyncResourceVersion()) @@ -244,7 +252,7 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error { StorageVersionMigrations(). UpdateStatus( ctx, - setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationRunning, migrationRunningStatusReason), + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationRunning, migrationRunningStatusReason, ""), metav1.UpdateOptions{}, ) if err != nil { @@ -255,60 +263,72 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error { if err != nil { return err } - typeMeta := metav1.TypeMeta{} - typeMeta.APIVersion, typeMeta.Kind = gvk.ToAPIVersionAndKind() - data, err := json.Marshal(typeMeta) - if err != nil { - return err - } // ToDo: implement a mechanism to resume migration from the last migrated resource in case of a failure // process storage migration - for _, gvrKey := range resourceMonitor.Store.ListKeys() { - namespace, name, err := cache.SplitMetaNamespaceKey(gvrKey) + for _, obj := range resourceMonitor.Store.List() { + accessor, err := meta.Accessor(obj) if err != nil { return err } - _, err = svmc.dynamicClient.Resource(gvr). - Namespace(namespace). + typeMeta := typeMetaUIDRV{} + typeMeta.APIVersion, typeMeta.Kind = gvk.ToAPIVersionAndKind() + // set UID so that when a resource gets deleted, we get an "uid mismatch" + // conflict error instead of trying to create it. + typeMeta.UID = accessor.GetUID() + // set RV so that when a resources gets updated or deleted+recreated, we get an "object has been modified" + // conflict error. we do not actually need to do anything special for the updated case because if RV + // was not set, it would just result in no-op request. but for the deleted+recreated case, if RV is + // not set but UID is set, we would get an immutable field validation error. hence we must set both. + typeMeta.ResourceVersion = accessor.GetResourceVersion() + data, err := json.Marshal(typeMeta) + if err != nil { + return err + } + + _, errPatch := svmc.dynamicClient.Resource(gvr). + Namespace(accessor.GetNamespace()). Patch(ctx, - name, + accessor.GetName(), types.ApplyPatchType, data, metav1.PatchOptions{ FieldManager: svmc.controllerName, }, ) - if err != nil { - // in case of NotFound or Conflict, we can stop processing migration for that resource - if apierrors.IsNotFound(err) || apierrors.IsConflict(err) { - continue - } - _, err = svmc.kubeClient.StoragemigrationV1alpha1(). + // in case of conflict, we can stop processing migration for that resource because it has either been + // - updated, meaning that migration has already been performed + // - deleted, meaning that migration is not needed + // - deleted and recreated, meaning that migration has already been performed + if apierrors.IsConflict(errPatch) { + logger.V(6).Info("Resource ignored due to conflict", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String(), "err", errPatch) + continue + } + + if errPatch != nil { + logger.V(4).Error(errPatch, "Failed to migrate the resource", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String(), "reason", apierrors.ReasonForError(errPatch)) + + _, errStatus := svmc.kubeClient.StoragemigrationV1alpha1(). StorageVersionMigrations(). UpdateStatus( ctx, - setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason), + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "migration encountered unhandled error"), metav1.UpdateOptions{}, ) - if err != nil { - return err - } - logger.V(4).Error(err, "Failed to migrate the resource", "name", gvrKey, "gvr", gvr.String(), "reason", apierrors.ReasonForError(err)) - return nil + return errStatus // Todo: add retry for scenarios where API server returns rate limiting error } - logger.V(4).Info("Successfully migrated the resource", "name", gvrKey, "gvr", gvr.String()) + logger.V(4).Info("Successfully migrated the resource", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String()) } _, err = svmc.kubeClient.StoragemigrationV1alpha1(). StorageVersionMigrations(). UpdateStatus( ctx, - setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded, migrationSuccessStatusReason), + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded, migrationSuccessStatusReason, ""), metav1.UpdateOptions{}, ) if err != nil { @@ -318,3 +338,13 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error { logger.V(4).Info("Finished syncing svm resource", "key", key, "gvr", gvr.String(), "elapsed", time.Since(startTime)) return nil } + +type typeMetaUIDRV struct { + metav1.TypeMeta `json:",inline"` + objectMetaUIDandRV `json:"metadata,omitempty"` +} + +type objectMetaUIDandRV struct { + UID types.UID `json:"uid,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty"` +} diff --git a/pkg/controller/storageversionmigrator/util.go b/pkg/controller/storageversionmigrator/util.go index 08f6efc0188..816f85f1b39 100644 --- a/pkg/controller/storageversionmigrator/util.go +++ b/pkg/controller/storageversionmigrator/util.go @@ -62,7 +62,7 @@ func indexOfCondition(svm *svmv1alpha1.StorageVersionMigration, conditionType sv func setStatusConditions( toBeUpdatedSVM *svmv1alpha1.StorageVersionMigration, conditionType svmv1alpha1.MigrationConditionType, - reason string, + reason, message string, ) *svmv1alpha1.StorageVersionMigration { if !IsConditionTrue(toBeUpdatedSVM, conditionType) { if conditionType == svmv1alpha1.MigrationSucceeded || conditionType == svmv1alpha1.MigrationFailed { @@ -77,6 +77,7 @@ func setStatusConditions( Status: corev1.ConditionTrue, LastUpdateTime: metav1.Now(), Reason: reason, + Message: message, }) } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index c2c6ac442f4..ea28632ccf6 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -486,7 +486,10 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) Name: saRolePrefix + "storage-version-migrator-controller", }, Rules: []rbacv1.PolicyRule{ - rbacv1helpers.NewRule("list", "patch").Groups("*").Resources("*").RuleOrDie(), + // need list to get current RV for any resource + // need patch for SSA of any resource + // need create because SSA of a deleted resource will be interpreted as a create request, these always fail with a conflict error because UID is set + rbacv1helpers.NewRule("list", "create", "patch").Groups("*").Resources("*").RuleOrDie(), rbacv1helpers.NewRule("update").Groups(storageVersionMigrationGroup).Resources("storageversionmigrations/status").RuleOrDie(), }, }) diff --git a/test/images/agnhost/crd-conversion-webhook/converter/framework.go b/test/images/agnhost/crd-conversion-webhook/converter/framework.go index 9df06c05ca9..c16a684f0d3 100644 --- a/test/images/agnhost/crd-conversion-webhook/converter/framework.go +++ b/test/images/agnhost/crd-conversion-webhook/converter/framework.go @@ -17,6 +17,7 @@ limitations under the License. package converter import ( + "bytes" "fmt" "io" "net/http" @@ -131,7 +132,7 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) { return } - klog.V(2).Infof("handling request: %v", body) + klog.V(2).Infof("handling request: %s", string(body)) obj, gvk, err := serializer.Decode(body, nil, nil) if err != nil { msg := fmt.Sprintf("failed to deserialize body (%v) with error %v", string(body), err) @@ -152,7 +153,6 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) { } convertReview.Response = doConversionV1beta1(convertReview.Request, convert) convertReview.Response.UID = convertReview.Request.UID - klog.V(2).Info(fmt.Sprintf("sending response: %v", convertReview.Response)) // reset the request, it is not needed in a response. convertReview.Request = &v1beta1.ConversionRequest{} @@ -167,7 +167,6 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) { } convertReview.Response = doConversionV1(convertReview.Request, convert) convertReview.Response.UID = convertReview.Request.UID - klog.V(2).Info(fmt.Sprintf("sending response: %v", convertReview.Response)) // reset the request, it is not needed in a response. convertReview.Request = &v1.ConversionRequest{} @@ -187,12 +186,14 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) { http.Error(w, msg, http.StatusBadRequest) return } - err = outSerializer.Encode(responseObj, w) + var buf bytes.Buffer + err = outSerializer.Encode(responseObj, io.MultiWriter(w, &buf)) if err != nil { klog.Error(err) http.Error(w, err.Error(), http.StatusInternalServerError) return } + klog.V(2).Infof("sending response: %s", buf.String()) } // ServeExampleConvert servers endpoint for the example converter defined as convertExampleCRD function. diff --git a/test/integration/storageversionmigrator/storageversionmigrator_test.go b/test/integration/storageversionmigrator/storageversionmigrator_test.go index 92961a1b6cd..581064e4af0 100644 --- a/test/integration/storageversionmigrator/storageversionmigrator_test.go +++ b/test/integration/storageversionmigrator/storageversionmigrator_test.go @@ -19,20 +19,24 @@ package storageversionmigrator import ( "bytes" "context" + "strconv" + "sync" "testing" "time" - etcd3watcher "k8s.io/apiserver/pkg/storage/etcd3" - "k8s.io/klog/v2/ktesting" + "go.uber.org/goleak" svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller" + etcd3watcher "k8s.io/apiserver/pkg/storage/etcd3" utilfeature "k8s.io/apiserver/pkg/util/feature" clientgofeaturegate "k8s.io/client-go/features" "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/test/integration/framework" ) // TestStorageVersionMigration is an integration test that verifies storage version migration works. @@ -50,7 +54,7 @@ func TestStorageVersionMigration(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true) // this makes the test super responsive. It's set to a default of 1 minute. - encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Millisecond + encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Second _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) @@ -152,7 +156,12 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true) // decode errors are expected when using conversation webhooks etcd3watcher.TestOnlySetFatalOnDecodeError(false) - defer etcd3watcher.TestOnlySetFatalOnDecodeError(true) + t.Cleanup(func() { etcd3watcher.TestOnlySetFatalOnDecodeError(true) }) + framework.GoleakCheck(t, // block test clean up and let any lingering watches complete before making decode errors fatal again + goleak.IgnoreTopFunction("k8s.io/kubernetes/vendor/gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), + goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), + goleak.IgnoreTopFunction("github.com/moby/spdystream.(*Connection).shutdown"), + ) _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) @@ -163,6 +172,9 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) { svmTest := svmSetup(ctx, t) certCtx := svmTest.setupServerCert(t) + // simulate monkeys creating and deleting CRs + svmTest.createChaos(ctx, t) + // create CRD with v1 serving and storage crd := svmTest.createCRD(t, crdName, crdGroup, certCtx, v1CRDVersion) @@ -238,7 +250,7 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) { if err != nil { t.Fatalf("Failed to create SVM resource: %v", err) } - if ok := svmTest.isCRDMigrated(ctx, t, svm.Name); !ok { + if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, "triggercr"); !ok { t.Fatalf("CRD not migrated") } @@ -261,10 +273,73 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) { shutdownServer() // assert RV and Generations of CRs - svmTest.validateRVAndGeneration(ctx, t, crVersions) + svmTest.validateRVAndGeneration(ctx, t, crVersions, "v2") // assert v2 CRs can be listed if err := svmTest.listCR(ctx, t, "v2"); err != nil { t.Fatalf("Failed to list CRs at version v2: %v", err) } } + +// TestStorageVersionMigrationDuringChaos serves as a stress test for the SVM controller. +// It creates a CRD and a reasonable number of static instances for that resource. +// It also continuously creates and deletes instances of that resource. +// During all of this, it attempts to perform multiple parallel migrations of the resource. +// It asserts that all migrations are successful and that none of the static instances +// were changed after they were initially created (as the migrations must be a no-op). +func TestStorageVersionMigrationDuringChaos(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionMigrator, true) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true) + + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + svmTest := svmSetup(ctx, t) + + svmTest.createChaos(ctx, t) + + crd := svmTest.createCRD(t, crdName, crdGroup, nil, v1CRDVersion) + + crVersions := make(map[string]versions) + + for i := range 50 { // a more realistic number of total resources + cr := svmTest.createCR(ctx, t, "created-cr-"+strconv.Itoa(i), "v1") + crVersions[cr.GetName()] = versions{ + generation: cr.GetGeneration(), + rv: cr.GetResourceVersion(), + isRVUpdated: false, // none of these CRs should change due to migrations + } + } + + var wg sync.WaitGroup + const migrations = 10 // more than the total workers of SVM + wg.Add(migrations) + for i := range migrations { + i := i + go func() { + defer wg.Done() + + svm, err := svmTest.createSVMResource( + ctx, t, "chaos-svm-"+strconv.Itoa(i), + svmv1alpha1.GroupVersionResource{ + Group: crd.Spec.Group, + Version: "v1", + Resource: crd.Spec.Names.Plural, + }, + ) + if err != nil { + t.Errorf("Failed to create SVM resource: %v", err) + return + } + triggerCRName := "chaos-trigger-" + strconv.Itoa(i) + if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, triggerCRName); !ok { + t.Errorf("CRD not migrated") + return + } + }() + } + wg.Wait() + + svmTest.validateRVAndGeneration(ctx, t, crVersions, "v1") +} diff --git a/test/integration/storageversionmigrator/util.go b/test/integration/storageversionmigrator/util.go index 8065a1df4a8..7b78e673209 100644 --- a/test/integration/storageversionmigrator/util.go +++ b/test/integration/storageversionmigrator/util.go @@ -31,6 +31,7 @@ import ( "sort" "strconv" "strings" + "sync" "testing" "time" @@ -45,33 +46,29 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" auditinternal "k8s.io/apiserver/pkg/apis/audit" auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" endpointsdiscovery "k8s.io/apiserver/pkg/endpoints/discovery" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/client-go/discovery" - cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" - "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/metadata" - "k8s.io/client-go/metadata/metadatainformer" "k8s.io/client-go/rest" - "k8s.io/client-go/restmapper" + "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/cert" "k8s.io/client-go/util/keyutil" utiltesting "k8s.io/client-go/util/testing" - "k8s.io/controller-manager/pkg/informerfactory" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" - "k8s.io/kubernetes/cmd/kube-controller-manager/names" - "k8s.io/kubernetes/pkg/controller/garbagecollector" + kubecontrollermanagertesting "k8s.io/kubernetes/cmd/kube-controller-manager/app/testing" "k8s.io/kubernetes/pkg/controller/storageversionmigrator" "k8s.io/kubernetes/test/images/agnhost/crd-conversion-webhook/converter" "k8s.io/kubernetes/test/integration" "k8s.io/kubernetes/test/integration/etcd" "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/utils" + "k8s.io/kubernetes/test/utils/kubeconfig" utilnet "k8s.io/utils/net" "k8s.io/utils/ptr" ) @@ -107,6 +104,11 @@ rules: - group: "" resources: ["secrets"] verbs: ["patch"] + - level: Metadata + resources: + - group: "stable.example.com" + resources: ["testcrds"] + users: ["system:serviceaccount:kube-system:storage-version-migrator-controller"] `, "initialEncryptionConfig": ` kind: EncryptionConfiguration @@ -272,81 +274,32 @@ func svmSetup(ctx context.Context, t *testing.T) *svmTest { "--audit-log-version", "audit.k8s.io/v1", "--audit-log-mode", "blocking", "--audit-log-path", logFile.Name(), + "--authorization-mode=RBAC", } storageConfig := framework.SharedEtcd() server := kubeapiservertesting.StartTestServerOrDie(t, nil, apiServerFlags, storageConfig) + kubeConfigFile := createKubeConfigFileForRestConfig(t, server.ClientConfig) + + kcm := kubecontrollermanagertesting.StartTestServerOrDie(ctx, []string{ + "--kubeconfig=" + kubeConfigFile, + "--controllers=garbagecollector,svm", // these are the only controllers needed for this test + "--use-service-account-credentials=true", // exercise RBAC of SVM controller + "--leader-elect=false", // KCM leader election calls os.Exit when it ends, so it is easier to just turn it off altogether + }) + clientSet, err := clientset.NewForConfig(server.ClientConfig) if err != nil { t.Fatalf("error in create clientset: %v", err) } - - discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery()) rvDiscoveryClient, err := discovery.NewDiscoveryClientForConfig(server.ClientConfig) if err != nil { t.Fatalf("failed to create discovery client: %v", err) } - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) - restMapper.Reset() - metadataClient, err := metadata.NewForConfig(server.ClientConfig) - if err != nil { - t.Fatalf("failed to create metadataClient: %v", err) - } dynamicClient, err := dynamic.NewForConfig(server.ClientConfig) if err != nil { t.Fatalf("error in create dynamic client: %v", err) } - sharedInformers := informers.NewSharedInformerFactory(clientSet, 0) - metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0) - alwaysStarted := make(chan struct{}) - close(alwaysStarted) - - gc, err := garbagecollector.NewGarbageCollector( - ctx, - clientSet, - metadataClient, - restMapper, - garbagecollector.DefaultIgnoredResources(), - informerfactory.NewInformerFactory(sharedInformers, metadataInformers), - alwaysStarted, - ) - if err != nil { - t.Fatalf("error while creating garbage collector: %v", err) - - } - startGC := func() { - syncPeriod := 5 * time.Second - go wait.Until(func() { - restMapper.Reset() - }, syncPeriod, ctx.Done()) - go gc.Run(ctx, 1) - go gc.Sync(ctx, clientSet.Discovery(), syncPeriod) - } - - svmController := storageversionmigrator.NewSVMController( - ctx, - clientSet, - dynamicClient, - sharedInformers.Storagemigration().V1alpha1().StorageVersionMigrations(), - names.StorageVersionMigratorController, - restMapper, - gc.GetDependencyGraphBuilder(), - ) - - rvController := storageversionmigrator.NewResourceVersionController( - ctx, - clientSet, - rvDiscoveryClient, - metadataClient, - sharedInformers.Storagemigration().V1alpha1().StorageVersionMigrations(), - restMapper, - ) - - // Start informer and controllers - sharedInformers.Start(ctx.Done()) - startGC() - go svmController.Run(ctx) - go rvController.Run(ctx) svmTest := &svmTest{ storageConfig: storageConfig, @@ -361,6 +314,19 @@ func svmSetup(ctx context.Context, t *testing.T) *svmTest { } t.Cleanup(func() { + var validCodes = sets.New[int32](http.StatusOK, http.StatusConflict) // make sure SVM controller never creates + _ = svmTest.countMatchingAuditEvents(t, func(event utils.AuditEvent) bool { + if event.User != "system:serviceaccount:kube-system:storage-version-migrator-controller" { + return false + } + if !validCodes.Has(event.Code) { + t.Errorf("svm controller had invalid response code for event: %#v", event) + return true + } + return false + }) + + kcm.TearDownFn() server.TearDownFn() utiltesting.CloseAndRemove(t, svmTest.logFile) utiltesting.CloseAndRemove(t, svmTest.policyFile) @@ -373,6 +339,18 @@ func svmSetup(ctx context.Context, t *testing.T) *svmTest { return svmTest } +func createKubeConfigFileForRestConfig(t *testing.T, restConfig *rest.Config) string { + t.Helper() + + clientConfig := kubeconfig.CreateKubeConfig(restConfig) + + kubeConfigFile := filepath.Join(t.TempDir(), "kubeconfig.yaml") + if err := clientcmd.WriteToFile(*clientConfig, kubeConfigFile); err != nil { + t.Fatal(err) + } + return kubeConfigFile +} + func createEncryptionConfig(t *testing.T, encryptionConfig string) ( filePathForEncryptionConfig string, err error, @@ -606,82 +584,133 @@ func (svm *svmTest) waitForResourceMigration( ) bool { t.Helper() - var isMigrated bool + var triggerOnce sync.Once + err := wait.PollUntilContextTimeout( ctx, 500*time.Millisecond, - wait.ForeverTestTimeout, + 5*time.Minute, true, func(ctx context.Context) (bool, error) { svmResource, err := svm.getSVM(ctx, t, svmName) if err != nil { t.Fatalf("Failed to get SVM resource: %v", err) } + + if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationFailed) { + t.Logf("%q SVM has failed migration, %#v", svmName, svmResource.Status.Conditions) + return false, fmt.Errorf("SVM has failed migration") + } + if svmResource.Status.ResourceVersion == "" { + t.Logf("%q SVM has no resourceVersion", svmName) return false, nil } if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationSucceeded) { - isMigrated = true + t.Logf("%q SVM has completed migration", svmName) + return true, nil } + if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationRunning) { + t.Logf("%q SVM migration is running, %#v", svmName, svmResource.Status.Conditions) + return false, nil + } + + t.Logf("%q SVM has not started migration, %#v", svmName, svmResource.Status.Conditions) + // We utilize the LastSyncResourceVersion of the Garbage Collector (GC) to ensure that the cache is up-to-date before proceeding with the migration. // However, in a quiet cluster, the GC may not be updated unless there is some activity or the watch receives a bookmark event after every 10 minutes. // To expedite the update of the GC cache, we create a dummy secret and then promptly delete it. // This action forces the GC to refresh its cache, enabling us to proceed with the migration. - _, err = svm.createSecret(ctx, t, triggerSecretName, defaultNamespace) - if err != nil { - t.Fatalf("Failed to create secret: %v", err) - } - err = svm.client.CoreV1().Secrets(defaultNamespace).Delete(ctx, triggerSecretName, metav1.DeleteOptions{}) - if err != nil { - t.Fatalf("Failed to delete secret: %v", err) - } - - stream, err := os.Open(svm.logFile.Name()) - if err != nil { - t.Fatalf("Failed to open audit log file: %v", err) - } - defer func() { - if err := stream.Close(); err != nil { - t.Errorf("error while closing audit log file: %v", err) + // At this point we know that the RV has been set on the SVM resource, so the trigger will always have a higher RV. + // We only need to do this once. + triggerOnce.Do(func() { + _, err = svm.createSecret(ctx, t, triggerSecretName, defaultNamespace) + if err != nil { + t.Fatalf("Failed to create secret: %v", err) } - }() + err = svm.client.CoreV1().Secrets(defaultNamespace).Delete(ctx, triggerSecretName, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Failed to delete secret: %v", err) + } + }) - missingReport, err := utils.CheckAuditLines( - stream, - []utils.AuditEvent{ - { - Level: auditinternal.LevelMetadata, - Stage: auditinternal.StageResponseComplete, - RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s?fieldManager=storage-version-migrator-controller", defaultNamespace, name), - Verb: "patch", - Code: 200, - User: "system:apiserver", - Resource: "secrets", - Namespace: "default", - AuthorizeDecision: "allow", - RequestObject: false, - ResponseObject: false, - }, - }, - auditv1.SchemeGroupVersion, - ) - if err != nil { - t.Fatalf("Failed to check audit log: %v", err) - } - if (len(missingReport.MissingEvents) != 0) && (expectedEvents < missingReport.NumEventsChecked) { - isMigrated = false - } - - return isMigrated, nil + return false, nil }, ) if err != nil { + t.Logf("Failed to wait for resource migration for SVM %q with secret %q: %v", svmName, name, err) return false } - return isMigrated + err = wait.PollUntilContextTimeout( + ctx, + 500*time.Millisecond, + wait.ForeverTestTimeout, + true, + func(_ context.Context) (bool, error) { + want := utils.AuditEvent{ + Level: auditinternal.LevelMetadata, + Stage: auditinternal.StageResponseComplete, + RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s?fieldManager=storage-version-migrator-controller", defaultNamespace, name), + Verb: "patch", + Code: http.StatusOK, + User: "system:serviceaccount:kube-system:storage-version-migrator-controller", + Resource: "secrets", + Namespace: "default", + AuthorizeDecision: "allow", + RequestObject: false, + ResponseObject: false, + } + + if seen := svm.countMatchingAuditEvents(t, func(event utils.AuditEvent) bool { return reflect.DeepEqual(event, want) }); expectedEvents > seen { + t.Logf("audit log did not contain %d expected audit events, only has %d", expectedEvents, seen) + return false, nil + } + + return true, nil + }, + ) + if err != nil { + t.Logf("Failed to wait for audit logs events for SVM %q with secret %q: %v", svmName, name, err) + return false + } + + return true +} + +func (svm *svmTest) countMatchingAuditEvents(t *testing.T, f func(utils.AuditEvent) bool) int { + t.Helper() + + var seen int + for _, event := range svm.getAuditEvents(t) { + if f(event) { + seen++ + } + } + return seen +} + +func (svm *svmTest) getAuditEvents(t *testing.T) []utils.AuditEvent { + t.Helper() + + stream, err := os.Open(svm.logFile.Name()) + if err != nil { + t.Fatalf("Failed to open audit log file: %v", err) + } + defer func() { + if err := stream.Close(); err != nil { + t.Errorf("error while closing audit log file: %v", err) + } + }() + + missingReport, err := utils.CheckAuditLines(stream, nil, auditv1.SchemeGroupVersion) + if err != nil { + t.Fatalf("Failed to check audit log: %v", err) + } + + return missingReport.AllEvents } func (svm *svmTest) createCRD( @@ -706,24 +735,27 @@ func (svm *svmTest) createCRD( Plural: pluralName, Singular: name, }, - Scope: apiextensionsv1.NamespaceScoped, - Versions: crdVersions, - Conversion: &apiextensionsv1.CustomResourceConversion{ - Strategy: apiextensionsv1.WebhookConverter, - Webhook: &apiextensionsv1.WebhookConversion{ - ClientConfig: &apiextensionsv1.WebhookClientConfig{ - CABundle: certCtx.signingCert, - URL: ptr.To( - fmt.Sprintf("https://127.0.0.1:%d/%s", servicePort, webhookHandler), - ), - }, - ConversionReviewVersions: []string{"v1", "v2"}, - }, - }, + Scope: apiextensionsv1.NamespaceScoped, + Versions: crdVersions, PreserveUnknownFields: false, }, } + if certCtx != nil { + crd.Spec.Conversion = &apiextensionsv1.CustomResourceConversion{ + Strategy: apiextensionsv1.WebhookConverter, + Webhook: &apiextensionsv1.WebhookConversion{ + ClientConfig: &apiextensionsv1.WebhookClientConfig{ + CABundle: certCtx.signingCert, + URL: ptr.To( + fmt.Sprintf("https://127.0.0.1:%d/%s", servicePort, webhookHandler), + ), + }, + ConversionReviewVersions: []string{"v1", "v2"}, + }, + } + } + apiextensionsclient, err := apiextensionsclientset.NewForConfig(svm.clientConfig) if err != nil { t.Fatalf("Failed to create apiextensions client: %v", err) @@ -809,7 +841,12 @@ func (svm *svmTest) waitForCRDUpdate( } } -func (svm *svmTest) createCR(ctx context.Context, t *testing.T, crName, version string) *unstructured.Unstructured { +type testingT interface { + Helper() + Fatalf(format string, args ...any) +} + +func (svm *svmTest) createCR(ctx context.Context, t testingT, crName, version string) *unstructured.Unstructured { t.Helper() crdResource := schema.GroupVersionResource{ @@ -868,7 +905,7 @@ func (svm *svmTest) listCR(ctx context.Context, t *testing.T, version string) er return err } -func (svm *svmTest) deleteCR(ctx context.Context, t *testing.T, name, version string) { +func (svm *svmTest) deleteCR(ctx context.Context, t testingT, name, version string) { t.Helper() crdResource := schema.GroupVersionResource{ Group: crdGroup, @@ -883,7 +920,9 @@ func (svm *svmTest) deleteCR(ctx context.Context, t *testing.T, name, version st func (svm *svmTest) createConversionWebhook(ctx context.Context, t *testing.T, certCtx *certContext) context.CancelFunc { t.Helper() - http.HandleFunc(fmt.Sprintf("/%s", webhookHandler), converter.ServeExampleConvert) + + mux := http.NewServeMux() + mux.HandleFunc(fmt.Sprintf("/%s", webhookHandler), converter.ServeExampleConvert) block, _ := pem.Decode(certCtx.key) if block == nil { @@ -904,7 +943,8 @@ func (svm *svmTest) createConversionWebhook(ctx context.Context, t *testing.T, c } server := &http.Server{ - Addr: fmt.Sprintf("127.0.0.1:%d", servicePort), + Addr: fmt.Sprintf("127.0.0.1:%d", servicePort), + Handler: mux, TLSConfig: &tls.Config{ Certificates: []tls.Certificate{ { @@ -1030,29 +1070,53 @@ func (svm *svmTest) isCRStoredAtVersion(t *testing.T, version, crName string) bo return obj.GetAPIVersion() == fmt.Sprintf("%s/%s", crdGroup, version) } -func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName string) bool { +func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName, triggerCRName string) bool { t.Helper() + var triggerOnce sync.Once + err := wait.PollUntilContextTimeout( ctx, 500*time.Millisecond, - 1*time.Minute, + 5*time.Minute, true, func(ctx context.Context) (bool, error) { - triggerCR := svm.createCR(ctx, t, "triggercr", "v1") - svm.deleteCR(ctx, t, triggerCR.GetName(), "v1") svmResource, err := svm.getSVM(ctx, t, crdSVMName) if err != nil { t.Fatalf("Failed to get SVM resource: %v", err) } + + if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationFailed) { + t.Logf("%q SVM has failed migration, %#v", crdSVMName, svmResource.Status.Conditions) + return false, fmt.Errorf("SVM has failed migration") + } + if svmResource.Status.ResourceVersion == "" { + t.Logf("%q SVM has no resourceVersion", crdSVMName) return false, nil } if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationSucceeded) { + t.Logf("%q SVM has completed migration", crdSVMName) return true, nil } + if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationRunning) { + t.Logf("%q SVM migration is running, %#v", crdSVMName, svmResource.Status.Conditions) + return false, nil + } + + t.Logf("%q SVM has not started migration, %#v", crdSVMName, svmResource.Status.Conditions) + + // at this point we know that the RV has been set on the SVM resource, + // and we need to make sure that the GC list RV has caught up to that without waiting for a watch bookmark. + // we cannot trigger this any earlier as the rest mapper of the RV controller can be delayed + // and thus may not have observed the new CRD yet. we only need to do this once. + triggerOnce.Do(func() { + triggerCR := svm.createCR(ctx, t, triggerCRName, "v1") + svm.deleteCR(ctx, t, triggerCR.GetName(), "v1") + }) + return false, nil }, ) @@ -1065,7 +1129,7 @@ type versions struct { isRVUpdated bool } -func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, crVersions map[string]versions) { +func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, crVersions map[string]versions, getCRVersion string) { t.Helper() for crName, version := range crVersions { @@ -1083,12 +1147,53 @@ func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, c } // validate resourceVersion and generation - crVersion := svm.getCR(ctx, t, crName, "v2").GetResourceVersion() - if version.isRVUpdated && crVersion == version.rv { + crVersion := svm.getCR(ctx, t, crName, getCRVersion).GetResourceVersion() + isRVUnchanged := crVersion == version.rv + if version.isRVUpdated && isRVUnchanged { t.Fatalf("ResourceVersion of CR %s should not be equal. Expected: %s, Got: %s", crName, version.rv, crVersion) } + if !version.isRVUpdated && !isRVUnchanged { + t.Fatalf("ResourceVersion of CR %s should be equal. Expected: %s, Got: %s", crName, version.rv, crVersion) + } if obj.GetGeneration() != version.generation { t.Fatalf("Generation of CR %s should be equal. Expected: %d, Got: %d", crName, version.generation, obj.GetGeneration()) } } } + +func (svm *svmTest) createChaos(ctx context.Context, t *testing.T) { + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(ctx) + + noFailT := ignoreFailures{} // these create and delete requests are not coordinated with the rest of the test and can fail + + const workers = 10 + wg.Add(workers) + for i := range workers { + i := i + go func() { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + default: + } + + _ = svm.createCR(ctx, noFailT, "chaos-cr-"+strconv.Itoa(i), "v1") + svm.deleteCR(ctx, noFailT, "chaos-cr-"+strconv.Itoa(i), "v1") + } + }() + } + + t.Cleanup(func() { + cancel() + wg.Wait() + }) +} + +type ignoreFailures struct{} + +func (ignoreFailures) Helper() {} +func (ignoreFailures) Fatalf(format string, args ...any) {} diff --git a/test/utils/apiserver/testapiserver.go b/test/utils/apiserver/testapiserver.go index a40baf7bd7e..dbce541e92b 100644 --- a/test/utils/apiserver/testapiserver.go +++ b/test/utils/apiserver/testapiserver.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/google/uuid" + "k8s.io/apiserver/pkg/server/dynamiccertificates" etcdserver "k8s.io/apiserver/pkg/storage/etcd3/testserver" "k8s.io/apiserver/pkg/storage/storagebackend" @@ -69,6 +70,7 @@ func writeKubeConfigForWardleServerToKASConnection(t *testing.T, kubeClientConfi // the loopback client config uses a loopback cert with different SNI. We need to use the "real" // cert, so we'll hope we aren't hacked during a unit test and instead load it from the server we started. wardleToKASKubeClientConfig := rest.CopyConfig(kubeClientConfig) + wardleToKASKubeClientConfig.ServerName = "" // reset SNI to use the "real" cert servingCerts, _, err := cert.GetServingCertificatesForURL(wardleToKASKubeClientConfig.Host, "") if err != nil { diff --git a/test/utils/audit.go b/test/utils/audit.go index c85700a5291..cc5bf1338a5 100644 --- a/test/utils/audit.go +++ b/test/utils/audit.go @@ -66,6 +66,7 @@ type MissingEventsReport struct { LastEventChecked *auditinternal.Event NumEventsChecked int MissingEvents []AuditEvent + AllEvents []AuditEvent } // String returns a human readable string representation of the report @@ -118,6 +119,7 @@ func CheckAuditLinesFiltered(stream io.Reader, expected []AuditEvent, version sc } expectations.Mark(event) + missingReport.AllEvents = append(missingReport.AllEvents, event) } if err := scanner.Err(); err != nil { return missingReport, err diff --git a/test/utils/kubeconfig/kubeconfig.go b/test/utils/kubeconfig/kubeconfig.go index 809b40ed97c..93452a5419a 100644 --- a/test/utils/kubeconfig/kubeconfig.go +++ b/test/utils/kubeconfig/kubeconfig.go @@ -45,6 +45,7 @@ func CreateKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config { cluster := clientcmdapi.NewCluster() cluster.Server = clientCfg.Host + cluster.TLSServerName = clientCfg.ServerName cluster.CertificateAuthority = clientCfg.CAFile if len(cluster.CertificateAuthority) == 0 { cluster.CertificateAuthorityData = clientCfg.CAData