From 6a6771b514f5c6abdb638600f8d85214702a2f7d Mon Sep 17 00:00:00 2001 From: Monis Khan Date: Fri, 12 Jul 2024 23:10:35 -0400 Subject: [PATCH] svm: set UID and RV on SSA patch to cause conflict on logical create When a resource gets deleted during migration, the SVM SSA patch calls are interpreted as a logical create request. Since the object from storage is nil, the merged result is just a type meta object, which lacks a name in the body. This fails when the API server checks that the name from the request URL and the body are the same. Note that a create request is something that SVM controller should never do. Once the UID is set on the patch, the API server will fail the request at a slightly earlier point with an "uid mismatch" conflict error, which the SVM controller can handle gracefully. Setting UID by itself is not sufficient. When a resource gets deleted and recreated, if RV is not set but UID is set, we would get an immutable field validation error for attempting to update the UID. To address this, we set the resource version on the SSA patch as well. This will cause that update request to also fail with a conflict error. Added the create verb on all resources for SVM controller RBAC as otherwise the API server will reject the request before it fails with a conflict error. The change addresses a host of other issues with the SVM controller: 1. Include failure message in SVM resource 2. Do not block forever on unsynced GC monitor 3. Do not immediately fail on GC monitor being missing, allow for a grace period since discovery may be out of sync 4. Set higher QPS and burst to handle large migrations Test changes: 1. Clean up CRD webhook convertor logs 2. Allow SVM tests to be run multiple times to make finding flakes easier 3. Create and delete CRs during CRD test to force out any flakes 4. Add a stress test with multiple parallel migrations 5. Enable RBAC on KAS 6. Run KCM directly to exercise wiring and RBAC 7. Better logs during CRD migration 8. Scan audit logs to confirm SVM controller never creates Signed-off-by: Monis Khan --- .../app/storageversionmigrator.go | 4 + .../storageversionmigrator/resourceversion.go | 2 +- .../storageversionmigrator.go | 102 +++-- pkg/controller/storageversionmigrator/util.go | 3 +- .../rbac/bootstrappolicy/controller_policy.go | 5 +- .../converter/framework.go | 9 +- .../storageversionmigrator_test.go | 87 +++- .../storageversionmigrator/util.go | 385 +++++++++++------- test/utils/apiserver/testapiserver.go | 2 + test/utils/audit.go | 2 + test/utils/kubeconfig/kubeconfig.go | 1 + 11 files changed, 413 insertions(+), 189 deletions(-) diff --git a/cmd/kube-controller-manager/app/storageversionmigrator.go b/cmd/kube-controller-manager/app/storageversionmigrator.go index d1cb4f2160c..99cf02e0953 100644 --- a/cmd/kube-controller-manager/app/storageversionmigrator.go +++ b/cmd/kube-controller-manager/app/storageversionmigrator.go @@ -54,7 +54,11 @@ func startSVMController( return nil, true, fmt.Errorf("storage version migrator requires garbage collector") } + // svm controller can make a lot of requests during migration, keep it fast config := controllerContext.ClientBuilder.ConfigOrDie(controllerName) + config.QPS *= 20 + config.Burst *= 100 + client := controllerContext.ClientBuilder.ClientOrDie(controllerName) informer := controllerContext.InformerFactory.Storagemigration().V1alpha1().StorageVersionMigrations() diff --git a/pkg/controller/storageversionmigrator/resourceversion.go b/pkg/controller/storageversionmigrator/resourceversion.go index 169615bfdc0..7ce40ba4113 100644 --- a/pkg/controller/storageversionmigrator/resourceversion.go +++ b/pkg/controller/storageversionmigrator/resourceversion.go @@ -199,7 +199,7 @@ func (rv *ResourceVersionController) sync(ctx context.Context, key string) error StorageVersionMigrations(). UpdateStatus( ctx, - setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason), + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "resource does not exist in discovery"), metav1.UpdateOptions{}, ) if err != nil { diff --git a/pkg/controller/storageversionmigrator/storageversionmigrator.go b/pkg/controller/storageversionmigrator/storageversionmigrator.go index a1284f75b21..e90653751d7 100644 --- a/pkg/controller/storageversionmigrator/storageversionmigrator.go +++ b/pkg/controller/storageversionmigrator/storageversionmigrator.go @@ -204,27 +204,35 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error { } gvr := getGVRFromResource(toBeProcessedSVM) - resourceMonitor, err := svmc.dependencyGraphBuilder.GetMonitor(ctx, gvr) + // prevent unsynced monitor from blocking forever + // use a short timeout so that we can fail quickly and possibly handle other migrations while this monitor gets ready. + monCtx, monCtxCancel := context.WithTimeout(ctx, 10*time.Second) + defer monCtxCancel() + resourceMonitor, errMonitor := svmc.dependencyGraphBuilder.GetMonitor(monCtx, gvr) if resourceMonitor != nil { - if err != nil { + if errMonitor != nil { // non nil monitor indicates that error is due to resource not being synced - return fmt.Errorf("dependency graph is not synced, requeuing to attempt again") + return fmt.Errorf("dependency graph is not synced, requeuing to attempt again: %w", errMonitor) } } else { + logger.V(4).Error(errMonitor, "resource does not exist in GC", "gvr", gvr.String()) + + // our GC cache could be missing a recently created custom resource, so give it some time to catch up + // we resync discovery every 30 seconds so twice that should be sufficient + if toBeProcessedSVM.CreationTimestamp.Add(time.Minute).After(time.Now()) { + return fmt.Errorf("resource does not exist in GC, requeuing to attempt again: %w", errMonitor) + } + // we can't migrate a resource that doesn't exist in the GC - _, err = svmc.kubeClient.StoragemigrationV1alpha1(). + _, errStatus := svmc.kubeClient.StoragemigrationV1alpha1(). StorageVersionMigrations(). UpdateStatus( ctx, - setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason), + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "resource not found"), metav1.UpdateOptions{}, ) - if err != nil { - return err - } - logger.V(4).Error(fmt.Errorf("error migrating the resource"), "resource does not exist in GC", "gvr", gvr.String()) - return nil + return errStatus } gcListResourceVersion, err := convertResourceVersionToInt(resourceMonitor.Controller.LastSyncResourceVersion()) @@ -244,7 +252,7 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error { StorageVersionMigrations(). UpdateStatus( ctx, - setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationRunning, migrationRunningStatusReason), + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationRunning, migrationRunningStatusReason, ""), metav1.UpdateOptions{}, ) if err != nil { @@ -255,60 +263,72 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error { if err != nil { return err } - typeMeta := metav1.TypeMeta{} - typeMeta.APIVersion, typeMeta.Kind = gvk.ToAPIVersionAndKind() - data, err := json.Marshal(typeMeta) - if err != nil { - return err - } // ToDo: implement a mechanism to resume migration from the last migrated resource in case of a failure // process storage migration - for _, gvrKey := range resourceMonitor.Store.ListKeys() { - namespace, name, err := cache.SplitMetaNamespaceKey(gvrKey) + for _, obj := range resourceMonitor.Store.List() { + accessor, err := meta.Accessor(obj) if err != nil { return err } - _, err = svmc.dynamicClient.Resource(gvr). - Namespace(namespace). + typeMeta := typeMetaUIDRV{} + typeMeta.APIVersion, typeMeta.Kind = gvk.ToAPIVersionAndKind() + // set UID so that when a resource gets deleted, we get an "uid mismatch" + // conflict error instead of trying to create it. + typeMeta.UID = accessor.GetUID() + // set RV so that when a resources gets updated or deleted+recreated, we get an "object has been modified" + // conflict error. we do not actually need to do anything special for the updated case because if RV + // was not set, it would just result in no-op request. but for the deleted+recreated case, if RV is + // not set but UID is set, we would get an immutable field validation error. hence we must set both. + typeMeta.ResourceVersion = accessor.GetResourceVersion() + data, err := json.Marshal(typeMeta) + if err != nil { + return err + } + + _, errPatch := svmc.dynamicClient.Resource(gvr). + Namespace(accessor.GetNamespace()). Patch(ctx, - name, + accessor.GetName(), types.ApplyPatchType, data, metav1.PatchOptions{ FieldManager: svmc.controllerName, }, ) - if err != nil { - // in case of NotFound or Conflict, we can stop processing migration for that resource - if apierrors.IsNotFound(err) || apierrors.IsConflict(err) { - continue - } - _, err = svmc.kubeClient.StoragemigrationV1alpha1(). + // in case of conflict, we can stop processing migration for that resource because it has either been + // - updated, meaning that migration has already been performed + // - deleted, meaning that migration is not needed + // - deleted and recreated, meaning that migration has already been performed + if apierrors.IsConflict(errPatch) { + logger.V(6).Info("Resource ignored due to conflict", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String(), "err", errPatch) + continue + } + + if errPatch != nil { + logger.V(4).Error(errPatch, "Failed to migrate the resource", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String(), "reason", apierrors.ReasonForError(errPatch)) + + _, errStatus := svmc.kubeClient.StoragemigrationV1alpha1(). StorageVersionMigrations(). UpdateStatus( ctx, - setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason), + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "migration encountered unhandled error"), metav1.UpdateOptions{}, ) - if err != nil { - return err - } - logger.V(4).Error(err, "Failed to migrate the resource", "name", gvrKey, "gvr", gvr.String(), "reason", apierrors.ReasonForError(err)) - return nil + return errStatus // Todo: add retry for scenarios where API server returns rate limiting error } - logger.V(4).Info("Successfully migrated the resource", "name", gvrKey, "gvr", gvr.String()) + logger.V(4).Info("Successfully migrated the resource", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String()) } _, err = svmc.kubeClient.StoragemigrationV1alpha1(). StorageVersionMigrations(). UpdateStatus( ctx, - setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded, migrationSuccessStatusReason), + setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded, migrationSuccessStatusReason, ""), metav1.UpdateOptions{}, ) if err != nil { @@ -318,3 +338,13 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error { logger.V(4).Info("Finished syncing svm resource", "key", key, "gvr", gvr.String(), "elapsed", time.Since(startTime)) return nil } + +type typeMetaUIDRV struct { + metav1.TypeMeta `json:",inline"` + objectMetaUIDandRV `json:"metadata,omitempty"` +} + +type objectMetaUIDandRV struct { + UID types.UID `json:"uid,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty"` +} diff --git a/pkg/controller/storageversionmigrator/util.go b/pkg/controller/storageversionmigrator/util.go index 08f6efc0188..816f85f1b39 100644 --- a/pkg/controller/storageversionmigrator/util.go +++ b/pkg/controller/storageversionmigrator/util.go @@ -62,7 +62,7 @@ func indexOfCondition(svm *svmv1alpha1.StorageVersionMigration, conditionType sv func setStatusConditions( toBeUpdatedSVM *svmv1alpha1.StorageVersionMigration, conditionType svmv1alpha1.MigrationConditionType, - reason string, + reason, message string, ) *svmv1alpha1.StorageVersionMigration { if !IsConditionTrue(toBeUpdatedSVM, conditionType) { if conditionType == svmv1alpha1.MigrationSucceeded || conditionType == svmv1alpha1.MigrationFailed { @@ -77,6 +77,7 @@ func setStatusConditions( Status: corev1.ConditionTrue, LastUpdateTime: metav1.Now(), Reason: reason, + Message: message, }) } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index c2c6ac442f4..ea28632ccf6 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -486,7 +486,10 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) Name: saRolePrefix + "storage-version-migrator-controller", }, Rules: []rbacv1.PolicyRule{ - rbacv1helpers.NewRule("list", "patch").Groups("*").Resources("*").RuleOrDie(), + // need list to get current RV for any resource + // need patch for SSA of any resource + // need create because SSA of a deleted resource will be interpreted as a create request, these always fail with a conflict error because UID is set + rbacv1helpers.NewRule("list", "create", "patch").Groups("*").Resources("*").RuleOrDie(), rbacv1helpers.NewRule("update").Groups(storageVersionMigrationGroup).Resources("storageversionmigrations/status").RuleOrDie(), }, }) diff --git a/test/images/agnhost/crd-conversion-webhook/converter/framework.go b/test/images/agnhost/crd-conversion-webhook/converter/framework.go index 9df06c05ca9..c16a684f0d3 100644 --- a/test/images/agnhost/crd-conversion-webhook/converter/framework.go +++ b/test/images/agnhost/crd-conversion-webhook/converter/framework.go @@ -17,6 +17,7 @@ limitations under the License. package converter import ( + "bytes" "fmt" "io" "net/http" @@ -131,7 +132,7 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) { return } - klog.V(2).Infof("handling request: %v", body) + klog.V(2).Infof("handling request: %s", string(body)) obj, gvk, err := serializer.Decode(body, nil, nil) if err != nil { msg := fmt.Sprintf("failed to deserialize body (%v) with error %v", string(body), err) @@ -152,7 +153,6 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) { } convertReview.Response = doConversionV1beta1(convertReview.Request, convert) convertReview.Response.UID = convertReview.Request.UID - klog.V(2).Info(fmt.Sprintf("sending response: %v", convertReview.Response)) // reset the request, it is not needed in a response. convertReview.Request = &v1beta1.ConversionRequest{} @@ -167,7 +167,6 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) { } convertReview.Response = doConversionV1(convertReview.Request, convert) convertReview.Response.UID = convertReview.Request.UID - klog.V(2).Info(fmt.Sprintf("sending response: %v", convertReview.Response)) // reset the request, it is not needed in a response. convertReview.Request = &v1.ConversionRequest{} @@ -187,12 +186,14 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) { http.Error(w, msg, http.StatusBadRequest) return } - err = outSerializer.Encode(responseObj, w) + var buf bytes.Buffer + err = outSerializer.Encode(responseObj, io.MultiWriter(w, &buf)) if err != nil { klog.Error(err) http.Error(w, err.Error(), http.StatusInternalServerError) return } + klog.V(2).Infof("sending response: %s", buf.String()) } // ServeExampleConvert servers endpoint for the example converter defined as convertExampleCRD function. diff --git a/test/integration/storageversionmigrator/storageversionmigrator_test.go b/test/integration/storageversionmigrator/storageversionmigrator_test.go index 92961a1b6cd..581064e4af0 100644 --- a/test/integration/storageversionmigrator/storageversionmigrator_test.go +++ b/test/integration/storageversionmigrator/storageversionmigrator_test.go @@ -19,20 +19,24 @@ package storageversionmigrator import ( "bytes" "context" + "strconv" + "sync" "testing" "time" - etcd3watcher "k8s.io/apiserver/pkg/storage/etcd3" - "k8s.io/klog/v2/ktesting" + "go.uber.org/goleak" svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller" + etcd3watcher "k8s.io/apiserver/pkg/storage/etcd3" utilfeature "k8s.io/apiserver/pkg/util/feature" clientgofeaturegate "k8s.io/client-go/features" "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/test/integration/framework" ) // TestStorageVersionMigration is an integration test that verifies storage version migration works. @@ -50,7 +54,7 @@ func TestStorageVersionMigration(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true) // this makes the test super responsive. It's set to a default of 1 minute. - encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Millisecond + encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Second _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) @@ -152,7 +156,12 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true) // decode errors are expected when using conversation webhooks etcd3watcher.TestOnlySetFatalOnDecodeError(false) - defer etcd3watcher.TestOnlySetFatalOnDecodeError(true) + t.Cleanup(func() { etcd3watcher.TestOnlySetFatalOnDecodeError(true) }) + framework.GoleakCheck(t, // block test clean up and let any lingering watches complete before making decode errors fatal again + goleak.IgnoreTopFunction("k8s.io/kubernetes/vendor/gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), + goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), + goleak.IgnoreTopFunction("github.com/moby/spdystream.(*Connection).shutdown"), + ) _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) @@ -163,6 +172,9 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) { svmTest := svmSetup(ctx, t) certCtx := svmTest.setupServerCert(t) + // simulate monkeys creating and deleting CRs + svmTest.createChaos(ctx, t) + // create CRD with v1 serving and storage crd := svmTest.createCRD(t, crdName, crdGroup, certCtx, v1CRDVersion) @@ -238,7 +250,7 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) { if err != nil { t.Fatalf("Failed to create SVM resource: %v", err) } - if ok := svmTest.isCRDMigrated(ctx, t, svm.Name); !ok { + if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, "triggercr"); !ok { t.Fatalf("CRD not migrated") } @@ -261,10 +273,73 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) { shutdownServer() // assert RV and Generations of CRs - svmTest.validateRVAndGeneration(ctx, t, crVersions) + svmTest.validateRVAndGeneration(ctx, t, crVersions, "v2") // assert v2 CRs can be listed if err := svmTest.listCR(ctx, t, "v2"); err != nil { t.Fatalf("Failed to list CRs at version v2: %v", err) } } + +// TestStorageVersionMigrationDuringChaos serves as a stress test for the SVM controller. +// It creates a CRD and a reasonable number of static instances for that resource. +// It also continuously creates and deletes instances of that resource. +// During all of this, it attempts to perform multiple parallel migrations of the resource. +// It asserts that all migrations are successful and that none of the static instances +// were changed after they were initially created (as the migrations must be a no-op). +func TestStorageVersionMigrationDuringChaos(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionMigrator, true) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true) + + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + svmTest := svmSetup(ctx, t) + + svmTest.createChaos(ctx, t) + + crd := svmTest.createCRD(t, crdName, crdGroup, nil, v1CRDVersion) + + crVersions := make(map[string]versions) + + for i := range 50 { // a more realistic number of total resources + cr := svmTest.createCR(ctx, t, "created-cr-"+strconv.Itoa(i), "v1") + crVersions[cr.GetName()] = versions{ + generation: cr.GetGeneration(), + rv: cr.GetResourceVersion(), + isRVUpdated: false, // none of these CRs should change due to migrations + } + } + + var wg sync.WaitGroup + const migrations = 10 // more than the total workers of SVM + wg.Add(migrations) + for i := range migrations { + i := i + go func() { + defer wg.Done() + + svm, err := svmTest.createSVMResource( + ctx, t, "chaos-svm-"+strconv.Itoa(i), + svmv1alpha1.GroupVersionResource{ + Group: crd.Spec.Group, + Version: "v1", + Resource: crd.Spec.Names.Plural, + }, + ) + if err != nil { + t.Errorf("Failed to create SVM resource: %v", err) + return + } + triggerCRName := "chaos-trigger-" + strconv.Itoa(i) + if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, triggerCRName); !ok { + t.Errorf("CRD not migrated") + return + } + }() + } + wg.Wait() + + svmTest.validateRVAndGeneration(ctx, t, crVersions, "v1") +} diff --git a/test/integration/storageversionmigrator/util.go b/test/integration/storageversionmigrator/util.go index 8065a1df4a8..7b78e673209 100644 --- a/test/integration/storageversionmigrator/util.go +++ b/test/integration/storageversionmigrator/util.go @@ -31,6 +31,7 @@ import ( "sort" "strconv" "strings" + "sync" "testing" "time" @@ -45,33 +46,29 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" auditinternal "k8s.io/apiserver/pkg/apis/audit" auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" endpointsdiscovery "k8s.io/apiserver/pkg/endpoints/discovery" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/client-go/discovery" - cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" - "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/metadata" - "k8s.io/client-go/metadata/metadatainformer" "k8s.io/client-go/rest" - "k8s.io/client-go/restmapper" + "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/cert" "k8s.io/client-go/util/keyutil" utiltesting "k8s.io/client-go/util/testing" - "k8s.io/controller-manager/pkg/informerfactory" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" - "k8s.io/kubernetes/cmd/kube-controller-manager/names" - "k8s.io/kubernetes/pkg/controller/garbagecollector" + kubecontrollermanagertesting "k8s.io/kubernetes/cmd/kube-controller-manager/app/testing" "k8s.io/kubernetes/pkg/controller/storageversionmigrator" "k8s.io/kubernetes/test/images/agnhost/crd-conversion-webhook/converter" "k8s.io/kubernetes/test/integration" "k8s.io/kubernetes/test/integration/etcd" "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/utils" + "k8s.io/kubernetes/test/utils/kubeconfig" utilnet "k8s.io/utils/net" "k8s.io/utils/ptr" ) @@ -107,6 +104,11 @@ rules: - group: "" resources: ["secrets"] verbs: ["patch"] + - level: Metadata + resources: + - group: "stable.example.com" + resources: ["testcrds"] + users: ["system:serviceaccount:kube-system:storage-version-migrator-controller"] `, "initialEncryptionConfig": ` kind: EncryptionConfiguration @@ -272,81 +274,32 @@ func svmSetup(ctx context.Context, t *testing.T) *svmTest { "--audit-log-version", "audit.k8s.io/v1", "--audit-log-mode", "blocking", "--audit-log-path", logFile.Name(), + "--authorization-mode=RBAC", } storageConfig := framework.SharedEtcd() server := kubeapiservertesting.StartTestServerOrDie(t, nil, apiServerFlags, storageConfig) + kubeConfigFile := createKubeConfigFileForRestConfig(t, server.ClientConfig) + + kcm := kubecontrollermanagertesting.StartTestServerOrDie(ctx, []string{ + "--kubeconfig=" + kubeConfigFile, + "--controllers=garbagecollector,svm", // these are the only controllers needed for this test + "--use-service-account-credentials=true", // exercise RBAC of SVM controller + "--leader-elect=false", // KCM leader election calls os.Exit when it ends, so it is easier to just turn it off altogether + }) + clientSet, err := clientset.NewForConfig(server.ClientConfig) if err != nil { t.Fatalf("error in create clientset: %v", err) } - - discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery()) rvDiscoveryClient, err := discovery.NewDiscoveryClientForConfig(server.ClientConfig) if err != nil { t.Fatalf("failed to create discovery client: %v", err) } - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) - restMapper.Reset() - metadataClient, err := metadata.NewForConfig(server.ClientConfig) - if err != nil { - t.Fatalf("failed to create metadataClient: %v", err) - } dynamicClient, err := dynamic.NewForConfig(server.ClientConfig) if err != nil { t.Fatalf("error in create dynamic client: %v", err) } - sharedInformers := informers.NewSharedInformerFactory(clientSet, 0) - metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0) - alwaysStarted := make(chan struct{}) - close(alwaysStarted) - - gc, err := garbagecollector.NewGarbageCollector( - ctx, - clientSet, - metadataClient, - restMapper, - garbagecollector.DefaultIgnoredResources(), - informerfactory.NewInformerFactory(sharedInformers, metadataInformers), - alwaysStarted, - ) - if err != nil { - t.Fatalf("error while creating garbage collector: %v", err) - - } - startGC := func() { - syncPeriod := 5 * time.Second - go wait.Until(func() { - restMapper.Reset() - }, syncPeriod, ctx.Done()) - go gc.Run(ctx, 1) - go gc.Sync(ctx, clientSet.Discovery(), syncPeriod) - } - - svmController := storageversionmigrator.NewSVMController( - ctx, - clientSet, - dynamicClient, - sharedInformers.Storagemigration().V1alpha1().StorageVersionMigrations(), - names.StorageVersionMigratorController, - restMapper, - gc.GetDependencyGraphBuilder(), - ) - - rvController := storageversionmigrator.NewResourceVersionController( - ctx, - clientSet, - rvDiscoveryClient, - metadataClient, - sharedInformers.Storagemigration().V1alpha1().StorageVersionMigrations(), - restMapper, - ) - - // Start informer and controllers - sharedInformers.Start(ctx.Done()) - startGC() - go svmController.Run(ctx) - go rvController.Run(ctx) svmTest := &svmTest{ storageConfig: storageConfig, @@ -361,6 +314,19 @@ func svmSetup(ctx context.Context, t *testing.T) *svmTest { } t.Cleanup(func() { + var validCodes = sets.New[int32](http.StatusOK, http.StatusConflict) // make sure SVM controller never creates + _ = svmTest.countMatchingAuditEvents(t, func(event utils.AuditEvent) bool { + if event.User != "system:serviceaccount:kube-system:storage-version-migrator-controller" { + return false + } + if !validCodes.Has(event.Code) { + t.Errorf("svm controller had invalid response code for event: %#v", event) + return true + } + return false + }) + + kcm.TearDownFn() server.TearDownFn() utiltesting.CloseAndRemove(t, svmTest.logFile) utiltesting.CloseAndRemove(t, svmTest.policyFile) @@ -373,6 +339,18 @@ func svmSetup(ctx context.Context, t *testing.T) *svmTest { return svmTest } +func createKubeConfigFileForRestConfig(t *testing.T, restConfig *rest.Config) string { + t.Helper() + + clientConfig := kubeconfig.CreateKubeConfig(restConfig) + + kubeConfigFile := filepath.Join(t.TempDir(), "kubeconfig.yaml") + if err := clientcmd.WriteToFile(*clientConfig, kubeConfigFile); err != nil { + t.Fatal(err) + } + return kubeConfigFile +} + func createEncryptionConfig(t *testing.T, encryptionConfig string) ( filePathForEncryptionConfig string, err error, @@ -606,82 +584,133 @@ func (svm *svmTest) waitForResourceMigration( ) bool { t.Helper() - var isMigrated bool + var triggerOnce sync.Once + err := wait.PollUntilContextTimeout( ctx, 500*time.Millisecond, - wait.ForeverTestTimeout, + 5*time.Minute, true, func(ctx context.Context) (bool, error) { svmResource, err := svm.getSVM(ctx, t, svmName) if err != nil { t.Fatalf("Failed to get SVM resource: %v", err) } + + if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationFailed) { + t.Logf("%q SVM has failed migration, %#v", svmName, svmResource.Status.Conditions) + return false, fmt.Errorf("SVM has failed migration") + } + if svmResource.Status.ResourceVersion == "" { + t.Logf("%q SVM has no resourceVersion", svmName) return false, nil } if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationSucceeded) { - isMigrated = true + t.Logf("%q SVM has completed migration", svmName) + return true, nil } + if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationRunning) { + t.Logf("%q SVM migration is running, %#v", svmName, svmResource.Status.Conditions) + return false, nil + } + + t.Logf("%q SVM has not started migration, %#v", svmName, svmResource.Status.Conditions) + // We utilize the LastSyncResourceVersion of the Garbage Collector (GC) to ensure that the cache is up-to-date before proceeding with the migration. // However, in a quiet cluster, the GC may not be updated unless there is some activity or the watch receives a bookmark event after every 10 minutes. // To expedite the update of the GC cache, we create a dummy secret and then promptly delete it. // This action forces the GC to refresh its cache, enabling us to proceed with the migration. - _, err = svm.createSecret(ctx, t, triggerSecretName, defaultNamespace) - if err != nil { - t.Fatalf("Failed to create secret: %v", err) - } - err = svm.client.CoreV1().Secrets(defaultNamespace).Delete(ctx, triggerSecretName, metav1.DeleteOptions{}) - if err != nil { - t.Fatalf("Failed to delete secret: %v", err) - } - - stream, err := os.Open(svm.logFile.Name()) - if err != nil { - t.Fatalf("Failed to open audit log file: %v", err) - } - defer func() { - if err := stream.Close(); err != nil { - t.Errorf("error while closing audit log file: %v", err) + // At this point we know that the RV has been set on the SVM resource, so the trigger will always have a higher RV. + // We only need to do this once. + triggerOnce.Do(func() { + _, err = svm.createSecret(ctx, t, triggerSecretName, defaultNamespace) + if err != nil { + t.Fatalf("Failed to create secret: %v", err) } - }() + err = svm.client.CoreV1().Secrets(defaultNamespace).Delete(ctx, triggerSecretName, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Failed to delete secret: %v", err) + } + }) - missingReport, err := utils.CheckAuditLines( - stream, - []utils.AuditEvent{ - { - Level: auditinternal.LevelMetadata, - Stage: auditinternal.StageResponseComplete, - RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s?fieldManager=storage-version-migrator-controller", defaultNamespace, name), - Verb: "patch", - Code: 200, - User: "system:apiserver", - Resource: "secrets", - Namespace: "default", - AuthorizeDecision: "allow", - RequestObject: false, - ResponseObject: false, - }, - }, - auditv1.SchemeGroupVersion, - ) - if err != nil { - t.Fatalf("Failed to check audit log: %v", err) - } - if (len(missingReport.MissingEvents) != 0) && (expectedEvents < missingReport.NumEventsChecked) { - isMigrated = false - } - - return isMigrated, nil + return false, nil }, ) if err != nil { + t.Logf("Failed to wait for resource migration for SVM %q with secret %q: %v", svmName, name, err) return false } - return isMigrated + err = wait.PollUntilContextTimeout( + ctx, + 500*time.Millisecond, + wait.ForeverTestTimeout, + true, + func(_ context.Context) (bool, error) { + want := utils.AuditEvent{ + Level: auditinternal.LevelMetadata, + Stage: auditinternal.StageResponseComplete, + RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s?fieldManager=storage-version-migrator-controller", defaultNamespace, name), + Verb: "patch", + Code: http.StatusOK, + User: "system:serviceaccount:kube-system:storage-version-migrator-controller", + Resource: "secrets", + Namespace: "default", + AuthorizeDecision: "allow", + RequestObject: false, + ResponseObject: false, + } + + if seen := svm.countMatchingAuditEvents(t, func(event utils.AuditEvent) bool { return reflect.DeepEqual(event, want) }); expectedEvents > seen { + t.Logf("audit log did not contain %d expected audit events, only has %d", expectedEvents, seen) + return false, nil + } + + return true, nil + }, + ) + if err != nil { + t.Logf("Failed to wait for audit logs events for SVM %q with secret %q: %v", svmName, name, err) + return false + } + + return true +} + +func (svm *svmTest) countMatchingAuditEvents(t *testing.T, f func(utils.AuditEvent) bool) int { + t.Helper() + + var seen int + for _, event := range svm.getAuditEvents(t) { + if f(event) { + seen++ + } + } + return seen +} + +func (svm *svmTest) getAuditEvents(t *testing.T) []utils.AuditEvent { + t.Helper() + + stream, err := os.Open(svm.logFile.Name()) + if err != nil { + t.Fatalf("Failed to open audit log file: %v", err) + } + defer func() { + if err := stream.Close(); err != nil { + t.Errorf("error while closing audit log file: %v", err) + } + }() + + missingReport, err := utils.CheckAuditLines(stream, nil, auditv1.SchemeGroupVersion) + if err != nil { + t.Fatalf("Failed to check audit log: %v", err) + } + + return missingReport.AllEvents } func (svm *svmTest) createCRD( @@ -706,24 +735,27 @@ func (svm *svmTest) createCRD( Plural: pluralName, Singular: name, }, - Scope: apiextensionsv1.NamespaceScoped, - Versions: crdVersions, - Conversion: &apiextensionsv1.CustomResourceConversion{ - Strategy: apiextensionsv1.WebhookConverter, - Webhook: &apiextensionsv1.WebhookConversion{ - ClientConfig: &apiextensionsv1.WebhookClientConfig{ - CABundle: certCtx.signingCert, - URL: ptr.To( - fmt.Sprintf("https://127.0.0.1:%d/%s", servicePort, webhookHandler), - ), - }, - ConversionReviewVersions: []string{"v1", "v2"}, - }, - }, + Scope: apiextensionsv1.NamespaceScoped, + Versions: crdVersions, PreserveUnknownFields: false, }, } + if certCtx != nil { + crd.Spec.Conversion = &apiextensionsv1.CustomResourceConversion{ + Strategy: apiextensionsv1.WebhookConverter, + Webhook: &apiextensionsv1.WebhookConversion{ + ClientConfig: &apiextensionsv1.WebhookClientConfig{ + CABundle: certCtx.signingCert, + URL: ptr.To( + fmt.Sprintf("https://127.0.0.1:%d/%s", servicePort, webhookHandler), + ), + }, + ConversionReviewVersions: []string{"v1", "v2"}, + }, + } + } + apiextensionsclient, err := apiextensionsclientset.NewForConfig(svm.clientConfig) if err != nil { t.Fatalf("Failed to create apiextensions client: %v", err) @@ -809,7 +841,12 @@ func (svm *svmTest) waitForCRDUpdate( } } -func (svm *svmTest) createCR(ctx context.Context, t *testing.T, crName, version string) *unstructured.Unstructured { +type testingT interface { + Helper() + Fatalf(format string, args ...any) +} + +func (svm *svmTest) createCR(ctx context.Context, t testingT, crName, version string) *unstructured.Unstructured { t.Helper() crdResource := schema.GroupVersionResource{ @@ -868,7 +905,7 @@ func (svm *svmTest) listCR(ctx context.Context, t *testing.T, version string) er return err } -func (svm *svmTest) deleteCR(ctx context.Context, t *testing.T, name, version string) { +func (svm *svmTest) deleteCR(ctx context.Context, t testingT, name, version string) { t.Helper() crdResource := schema.GroupVersionResource{ Group: crdGroup, @@ -883,7 +920,9 @@ func (svm *svmTest) deleteCR(ctx context.Context, t *testing.T, name, version st func (svm *svmTest) createConversionWebhook(ctx context.Context, t *testing.T, certCtx *certContext) context.CancelFunc { t.Helper() - http.HandleFunc(fmt.Sprintf("/%s", webhookHandler), converter.ServeExampleConvert) + + mux := http.NewServeMux() + mux.HandleFunc(fmt.Sprintf("/%s", webhookHandler), converter.ServeExampleConvert) block, _ := pem.Decode(certCtx.key) if block == nil { @@ -904,7 +943,8 @@ func (svm *svmTest) createConversionWebhook(ctx context.Context, t *testing.T, c } server := &http.Server{ - Addr: fmt.Sprintf("127.0.0.1:%d", servicePort), + Addr: fmt.Sprintf("127.0.0.1:%d", servicePort), + Handler: mux, TLSConfig: &tls.Config{ Certificates: []tls.Certificate{ { @@ -1030,29 +1070,53 @@ func (svm *svmTest) isCRStoredAtVersion(t *testing.T, version, crName string) bo return obj.GetAPIVersion() == fmt.Sprintf("%s/%s", crdGroup, version) } -func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName string) bool { +func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName, triggerCRName string) bool { t.Helper() + var triggerOnce sync.Once + err := wait.PollUntilContextTimeout( ctx, 500*time.Millisecond, - 1*time.Minute, + 5*time.Minute, true, func(ctx context.Context) (bool, error) { - triggerCR := svm.createCR(ctx, t, "triggercr", "v1") - svm.deleteCR(ctx, t, triggerCR.GetName(), "v1") svmResource, err := svm.getSVM(ctx, t, crdSVMName) if err != nil { t.Fatalf("Failed to get SVM resource: %v", err) } + + if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationFailed) { + t.Logf("%q SVM has failed migration, %#v", crdSVMName, svmResource.Status.Conditions) + return false, fmt.Errorf("SVM has failed migration") + } + if svmResource.Status.ResourceVersion == "" { + t.Logf("%q SVM has no resourceVersion", crdSVMName) return false, nil } if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationSucceeded) { + t.Logf("%q SVM has completed migration", crdSVMName) return true, nil } + if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationRunning) { + t.Logf("%q SVM migration is running, %#v", crdSVMName, svmResource.Status.Conditions) + return false, nil + } + + t.Logf("%q SVM has not started migration, %#v", crdSVMName, svmResource.Status.Conditions) + + // at this point we know that the RV has been set on the SVM resource, + // and we need to make sure that the GC list RV has caught up to that without waiting for a watch bookmark. + // we cannot trigger this any earlier as the rest mapper of the RV controller can be delayed + // and thus may not have observed the new CRD yet. we only need to do this once. + triggerOnce.Do(func() { + triggerCR := svm.createCR(ctx, t, triggerCRName, "v1") + svm.deleteCR(ctx, t, triggerCR.GetName(), "v1") + }) + return false, nil }, ) @@ -1065,7 +1129,7 @@ type versions struct { isRVUpdated bool } -func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, crVersions map[string]versions) { +func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, crVersions map[string]versions, getCRVersion string) { t.Helper() for crName, version := range crVersions { @@ -1083,12 +1147,53 @@ func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, c } // validate resourceVersion and generation - crVersion := svm.getCR(ctx, t, crName, "v2").GetResourceVersion() - if version.isRVUpdated && crVersion == version.rv { + crVersion := svm.getCR(ctx, t, crName, getCRVersion).GetResourceVersion() + isRVUnchanged := crVersion == version.rv + if version.isRVUpdated && isRVUnchanged { t.Fatalf("ResourceVersion of CR %s should not be equal. Expected: %s, Got: %s", crName, version.rv, crVersion) } + if !version.isRVUpdated && !isRVUnchanged { + t.Fatalf("ResourceVersion of CR %s should be equal. Expected: %s, Got: %s", crName, version.rv, crVersion) + } if obj.GetGeneration() != version.generation { t.Fatalf("Generation of CR %s should be equal. Expected: %d, Got: %d", crName, version.generation, obj.GetGeneration()) } } } + +func (svm *svmTest) createChaos(ctx context.Context, t *testing.T) { + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(ctx) + + noFailT := ignoreFailures{} // these create and delete requests are not coordinated with the rest of the test and can fail + + const workers = 10 + wg.Add(workers) + for i := range workers { + i := i + go func() { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + default: + } + + _ = svm.createCR(ctx, noFailT, "chaos-cr-"+strconv.Itoa(i), "v1") + svm.deleteCR(ctx, noFailT, "chaos-cr-"+strconv.Itoa(i), "v1") + } + }() + } + + t.Cleanup(func() { + cancel() + wg.Wait() + }) +} + +type ignoreFailures struct{} + +func (ignoreFailures) Helper() {} +func (ignoreFailures) Fatalf(format string, args ...any) {} diff --git a/test/utils/apiserver/testapiserver.go b/test/utils/apiserver/testapiserver.go index a40baf7bd7e..dbce541e92b 100644 --- a/test/utils/apiserver/testapiserver.go +++ b/test/utils/apiserver/testapiserver.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/google/uuid" + "k8s.io/apiserver/pkg/server/dynamiccertificates" etcdserver "k8s.io/apiserver/pkg/storage/etcd3/testserver" "k8s.io/apiserver/pkg/storage/storagebackend" @@ -69,6 +70,7 @@ func writeKubeConfigForWardleServerToKASConnection(t *testing.T, kubeClientConfi // the loopback client config uses a loopback cert with different SNI. We need to use the "real" // cert, so we'll hope we aren't hacked during a unit test and instead load it from the server we started. wardleToKASKubeClientConfig := rest.CopyConfig(kubeClientConfig) + wardleToKASKubeClientConfig.ServerName = "" // reset SNI to use the "real" cert servingCerts, _, err := cert.GetServingCertificatesForURL(wardleToKASKubeClientConfig.Host, "") if err != nil { diff --git a/test/utils/audit.go b/test/utils/audit.go index c85700a5291..cc5bf1338a5 100644 --- a/test/utils/audit.go +++ b/test/utils/audit.go @@ -66,6 +66,7 @@ type MissingEventsReport struct { LastEventChecked *auditinternal.Event NumEventsChecked int MissingEvents []AuditEvent + AllEvents []AuditEvent } // String returns a human readable string representation of the report @@ -118,6 +119,7 @@ func CheckAuditLinesFiltered(stream io.Reader, expected []AuditEvent, version sc } expectations.Mark(event) + missingReport.AllEvents = append(missingReport.AllEvents, event) } if err := scanner.Err(); err != nil { return missingReport, err diff --git a/test/utils/kubeconfig/kubeconfig.go b/test/utils/kubeconfig/kubeconfig.go index 809b40ed97c..93452a5419a 100644 --- a/test/utils/kubeconfig/kubeconfig.go +++ b/test/utils/kubeconfig/kubeconfig.go @@ -45,6 +45,7 @@ func CreateKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config { cluster := clientcmdapi.NewCluster() cluster.Server = clientCfg.Host + cluster.TLSServerName = clientCfg.ServerName cluster.CertificateAuthority = clientCfg.CAFile if len(cluster.CertificateAuthority) == 0 { cluster.CertificateAuthorityData = clientCfg.CAData