svm: set UID and RV on SSA patch to cause conflict on logical create

When a resource gets deleted during migration, the SVM SSA patch
calls are interpreted as a logical create request.  Since the object
from storage is nil, the merged result is just a type meta object,
which lacks a name in the body.  This fails when the API server
checks that the name from the request URL and the body are the same.
Note that a create request is something that SVM controller should
never do.

Once the UID is set on the patch, the API server will fail the
request at a slightly earlier point with an "uid mismatch" conflict
error, which the SVM controller can handle gracefully.

Setting UID by itself is not sufficient.  When a resource gets
deleted and recreated, if RV is not set but UID is set, we would get
an immutable field validation error for attempting to update the
UID.  To address this, we set the resource version on the SSA patch
as well.  This will cause that update request to also fail with a
conflict error.

Added the create verb on all resources for SVM controller RBAC as
otherwise the API server will reject the request before it fails
with a conflict error.

The change addresses a host of other issues with the SVM controller:

1. Include failure message in SVM resource
2. Do not block forever on unsynced GC monitor
3. Do not immediately fail on GC monitor being missing, allow for
   a grace period since discovery may be out of sync
4. Set higher QPS and burst to handle large migrations

Test changes:

1. Clean up CRD webhook convertor logs
2. Allow SVM tests to be run multiple times to make finding flakes easier
3. Create and delete CRs during CRD test to force out any flakes
4. Add a stress test with multiple parallel migrations
5. Enable RBAC on KAS
6. Run KCM directly to exercise wiring and RBAC
7. Better logs during CRD migration
8. Scan audit logs to confirm SVM controller never creates

Signed-off-by: Monis Khan <mok@microsoft.com>
This commit is contained in:
Monis Khan
2024-07-12 23:10:35 -04:00
parent f82030111f
commit 6a6771b514
11 changed files with 413 additions and 189 deletions

View File

@@ -19,20 +19,24 @@ package storageversionmigrator
import (
"bytes"
"context"
"strconv"
"sync"
"testing"
"time"
etcd3watcher "k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/klog/v2/ktesting"
"go.uber.org/goleak"
svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
etcd3watcher "k8s.io/apiserver/pkg/storage/etcd3"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientgofeaturegate "k8s.io/client-go/features"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
)
// TestStorageVersionMigration is an integration test that verifies storage version migration works.
@@ -50,7 +54,7 @@ func TestStorageVersionMigration(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true)
// this makes the test super responsive. It's set to a default of 1 minute.
encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Millisecond
encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Second
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
@@ -152,7 +156,12 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true)
// decode errors are expected when using conversation webhooks
etcd3watcher.TestOnlySetFatalOnDecodeError(false)
defer etcd3watcher.TestOnlySetFatalOnDecodeError(true)
t.Cleanup(func() { etcd3watcher.TestOnlySetFatalOnDecodeError(true) })
framework.GoleakCheck(t, // block test clean up and let any lingering watches complete before making decode errors fatal again
goleak.IgnoreTopFunction("k8s.io/kubernetes/vendor/gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("github.com/moby/spdystream.(*Connection).shutdown"),
)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
@@ -163,6 +172,9 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) {
svmTest := svmSetup(ctx, t)
certCtx := svmTest.setupServerCert(t)
// simulate monkeys creating and deleting CRs
svmTest.createChaos(ctx, t)
// create CRD with v1 serving and storage
crd := svmTest.createCRD(t, crdName, crdGroup, certCtx, v1CRDVersion)
@@ -238,7 +250,7 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create SVM resource: %v", err)
}
if ok := svmTest.isCRDMigrated(ctx, t, svm.Name); !ok {
if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, "triggercr"); !ok {
t.Fatalf("CRD not migrated")
}
@@ -261,10 +273,73 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) {
shutdownServer()
// assert RV and Generations of CRs
svmTest.validateRVAndGeneration(ctx, t, crVersions)
svmTest.validateRVAndGeneration(ctx, t, crVersions, "v2")
// assert v2 CRs can be listed
if err := svmTest.listCR(ctx, t, "v2"); err != nil {
t.Fatalf("Failed to list CRs at version v2: %v", err)
}
}
// TestStorageVersionMigrationDuringChaos serves as a stress test for the SVM controller.
// It creates a CRD and a reasonable number of static instances for that resource.
// It also continuously creates and deletes instances of that resource.
// During all of this, it attempts to perform multiple parallel migrations of the resource.
// It asserts that all migrations are successful and that none of the static instances
// were changed after they were initially created (as the migrations must be a no-op).
func TestStorageVersionMigrationDuringChaos(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionMigrator, true)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
svmTest := svmSetup(ctx, t)
svmTest.createChaos(ctx, t)
crd := svmTest.createCRD(t, crdName, crdGroup, nil, v1CRDVersion)
crVersions := make(map[string]versions)
for i := range 50 { // a more realistic number of total resources
cr := svmTest.createCR(ctx, t, "created-cr-"+strconv.Itoa(i), "v1")
crVersions[cr.GetName()] = versions{
generation: cr.GetGeneration(),
rv: cr.GetResourceVersion(),
isRVUpdated: false, // none of these CRs should change due to migrations
}
}
var wg sync.WaitGroup
const migrations = 10 // more than the total workers of SVM
wg.Add(migrations)
for i := range migrations {
i := i
go func() {
defer wg.Done()
svm, err := svmTest.createSVMResource(
ctx, t, "chaos-svm-"+strconv.Itoa(i),
svmv1alpha1.GroupVersionResource{
Group: crd.Spec.Group,
Version: "v1",
Resource: crd.Spec.Names.Plural,
},
)
if err != nil {
t.Errorf("Failed to create SVM resource: %v", err)
return
}
triggerCRName := "chaos-trigger-" + strconv.Itoa(i)
if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, triggerCRName); !ok {
t.Errorf("CRD not migrated")
return
}
}()
}
wg.Wait()
svmTest.validateRVAndGeneration(ctx, t, crVersions, "v1")
}

View File

@@ -31,6 +31,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
@@ -45,33 +46,29 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
endpointsdiscovery "k8s.io/apiserver/pkg/endpoints/discovery"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/client-go/discovery"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/cert"
"k8s.io/client-go/util/keyutil"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/controller-manager/pkg/informerfactory"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
kubecontrollermanagertesting "k8s.io/kubernetes/cmd/kube-controller-manager/app/testing"
"k8s.io/kubernetes/pkg/controller/storageversionmigrator"
"k8s.io/kubernetes/test/images/agnhost/crd-conversion-webhook/converter"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/etcd"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils/kubeconfig"
utilnet "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
@@ -107,6 +104,11 @@ rules:
- group: ""
resources: ["secrets"]
verbs: ["patch"]
- level: Metadata
resources:
- group: "stable.example.com"
resources: ["testcrds"]
users: ["system:serviceaccount:kube-system:storage-version-migrator-controller"]
`,
"initialEncryptionConfig": `
kind: EncryptionConfiguration
@@ -272,81 +274,32 @@ func svmSetup(ctx context.Context, t *testing.T) *svmTest {
"--audit-log-version", "audit.k8s.io/v1",
"--audit-log-mode", "blocking",
"--audit-log-path", logFile.Name(),
"--authorization-mode=RBAC",
}
storageConfig := framework.SharedEtcd()
server := kubeapiservertesting.StartTestServerOrDie(t, nil, apiServerFlags, storageConfig)
kubeConfigFile := createKubeConfigFileForRestConfig(t, server.ClientConfig)
kcm := kubecontrollermanagertesting.StartTestServerOrDie(ctx, []string{
"--kubeconfig=" + kubeConfigFile,
"--controllers=garbagecollector,svm", // these are the only controllers needed for this test
"--use-service-account-credentials=true", // exercise RBAC of SVM controller
"--leader-elect=false", // KCM leader election calls os.Exit when it ends, so it is easier to just turn it off altogether
})
clientSet, err := clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("error in create clientset: %v", err)
}
discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery())
rvDiscoveryClient, err := discovery.NewDiscoveryClientForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("failed to create discovery client: %v", err)
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
restMapper.Reset()
metadataClient, err := metadata.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("failed to create metadataClient: %v", err)
}
dynamicClient, err := dynamic.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("error in create dynamic client: %v", err)
}
sharedInformers := informers.NewSharedInformerFactory(clientSet, 0)
metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := garbagecollector.NewGarbageCollector(
ctx,
clientSet,
metadataClient,
restMapper,
garbagecollector.DefaultIgnoredResources(),
informerfactory.NewInformerFactory(sharedInformers, metadataInformers),
alwaysStarted,
)
if err != nil {
t.Fatalf("error while creating garbage collector: %v", err)
}
startGC := func() {
syncPeriod := 5 * time.Second
go wait.Until(func() {
restMapper.Reset()
}, syncPeriod, ctx.Done())
go gc.Run(ctx, 1)
go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
}
svmController := storageversionmigrator.NewSVMController(
ctx,
clientSet,
dynamicClient,
sharedInformers.Storagemigration().V1alpha1().StorageVersionMigrations(),
names.StorageVersionMigratorController,
restMapper,
gc.GetDependencyGraphBuilder(),
)
rvController := storageversionmigrator.NewResourceVersionController(
ctx,
clientSet,
rvDiscoveryClient,
metadataClient,
sharedInformers.Storagemigration().V1alpha1().StorageVersionMigrations(),
restMapper,
)
// Start informer and controllers
sharedInformers.Start(ctx.Done())
startGC()
go svmController.Run(ctx)
go rvController.Run(ctx)
svmTest := &svmTest{
storageConfig: storageConfig,
@@ -361,6 +314,19 @@ func svmSetup(ctx context.Context, t *testing.T) *svmTest {
}
t.Cleanup(func() {
var validCodes = sets.New[int32](http.StatusOK, http.StatusConflict) // make sure SVM controller never creates
_ = svmTest.countMatchingAuditEvents(t, func(event utils.AuditEvent) bool {
if event.User != "system:serviceaccount:kube-system:storage-version-migrator-controller" {
return false
}
if !validCodes.Has(event.Code) {
t.Errorf("svm controller had invalid response code for event: %#v", event)
return true
}
return false
})
kcm.TearDownFn()
server.TearDownFn()
utiltesting.CloseAndRemove(t, svmTest.logFile)
utiltesting.CloseAndRemove(t, svmTest.policyFile)
@@ -373,6 +339,18 @@ func svmSetup(ctx context.Context, t *testing.T) *svmTest {
return svmTest
}
func createKubeConfigFileForRestConfig(t *testing.T, restConfig *rest.Config) string {
t.Helper()
clientConfig := kubeconfig.CreateKubeConfig(restConfig)
kubeConfigFile := filepath.Join(t.TempDir(), "kubeconfig.yaml")
if err := clientcmd.WriteToFile(*clientConfig, kubeConfigFile); err != nil {
t.Fatal(err)
}
return kubeConfigFile
}
func createEncryptionConfig(t *testing.T, encryptionConfig string) (
filePathForEncryptionConfig string,
err error,
@@ -606,82 +584,133 @@ func (svm *svmTest) waitForResourceMigration(
) bool {
t.Helper()
var isMigrated bool
var triggerOnce sync.Once
err := wait.PollUntilContextTimeout(
ctx,
500*time.Millisecond,
wait.ForeverTestTimeout,
5*time.Minute,
true,
func(ctx context.Context) (bool, error) {
svmResource, err := svm.getSVM(ctx, t, svmName)
if err != nil {
t.Fatalf("Failed to get SVM resource: %v", err)
}
if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationFailed) {
t.Logf("%q SVM has failed migration, %#v", svmName, svmResource.Status.Conditions)
return false, fmt.Errorf("SVM has failed migration")
}
if svmResource.Status.ResourceVersion == "" {
t.Logf("%q SVM has no resourceVersion", svmName)
return false, nil
}
if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationSucceeded) {
isMigrated = true
t.Logf("%q SVM has completed migration", svmName)
return true, nil
}
if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationRunning) {
t.Logf("%q SVM migration is running, %#v", svmName, svmResource.Status.Conditions)
return false, nil
}
t.Logf("%q SVM has not started migration, %#v", svmName, svmResource.Status.Conditions)
// We utilize the LastSyncResourceVersion of the Garbage Collector (GC) to ensure that the cache is up-to-date before proceeding with the migration.
// However, in a quiet cluster, the GC may not be updated unless there is some activity or the watch receives a bookmark event after every 10 minutes.
// To expedite the update of the GC cache, we create a dummy secret and then promptly delete it.
// This action forces the GC to refresh its cache, enabling us to proceed with the migration.
_, err = svm.createSecret(ctx, t, triggerSecretName, defaultNamespace)
if err != nil {
t.Fatalf("Failed to create secret: %v", err)
}
err = svm.client.CoreV1().Secrets(defaultNamespace).Delete(ctx, triggerSecretName, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("Failed to delete secret: %v", err)
}
stream, err := os.Open(svm.logFile.Name())
if err != nil {
t.Fatalf("Failed to open audit log file: %v", err)
}
defer func() {
if err := stream.Close(); err != nil {
t.Errorf("error while closing audit log file: %v", err)
// At this point we know that the RV has been set on the SVM resource, so the trigger will always have a higher RV.
// We only need to do this once.
triggerOnce.Do(func() {
_, err = svm.createSecret(ctx, t, triggerSecretName, defaultNamespace)
if err != nil {
t.Fatalf("Failed to create secret: %v", err)
}
}()
err = svm.client.CoreV1().Secrets(defaultNamespace).Delete(ctx, triggerSecretName, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("Failed to delete secret: %v", err)
}
})
missingReport, err := utils.CheckAuditLines(
stream,
[]utils.AuditEvent{
{
Level: auditinternal.LevelMetadata,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s?fieldManager=storage-version-migrator-controller", defaultNamespace, name),
Verb: "patch",
Code: 200,
User: "system:apiserver",
Resource: "secrets",
Namespace: "default",
AuthorizeDecision: "allow",
RequestObject: false,
ResponseObject: false,
},
},
auditv1.SchemeGroupVersion,
)
if err != nil {
t.Fatalf("Failed to check audit log: %v", err)
}
if (len(missingReport.MissingEvents) != 0) && (expectedEvents < missingReport.NumEventsChecked) {
isMigrated = false
}
return isMigrated, nil
return false, nil
},
)
if err != nil {
t.Logf("Failed to wait for resource migration for SVM %q with secret %q: %v", svmName, name, err)
return false
}
return isMigrated
err = wait.PollUntilContextTimeout(
ctx,
500*time.Millisecond,
wait.ForeverTestTimeout,
true,
func(_ context.Context) (bool, error) {
want := utils.AuditEvent{
Level: auditinternal.LevelMetadata,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s?fieldManager=storage-version-migrator-controller", defaultNamespace, name),
Verb: "patch",
Code: http.StatusOK,
User: "system:serviceaccount:kube-system:storage-version-migrator-controller",
Resource: "secrets",
Namespace: "default",
AuthorizeDecision: "allow",
RequestObject: false,
ResponseObject: false,
}
if seen := svm.countMatchingAuditEvents(t, func(event utils.AuditEvent) bool { return reflect.DeepEqual(event, want) }); expectedEvents > seen {
t.Logf("audit log did not contain %d expected audit events, only has %d", expectedEvents, seen)
return false, nil
}
return true, nil
},
)
if err != nil {
t.Logf("Failed to wait for audit logs events for SVM %q with secret %q: %v", svmName, name, err)
return false
}
return true
}
func (svm *svmTest) countMatchingAuditEvents(t *testing.T, f func(utils.AuditEvent) bool) int {
t.Helper()
var seen int
for _, event := range svm.getAuditEvents(t) {
if f(event) {
seen++
}
}
return seen
}
func (svm *svmTest) getAuditEvents(t *testing.T) []utils.AuditEvent {
t.Helper()
stream, err := os.Open(svm.logFile.Name())
if err != nil {
t.Fatalf("Failed to open audit log file: %v", err)
}
defer func() {
if err := stream.Close(); err != nil {
t.Errorf("error while closing audit log file: %v", err)
}
}()
missingReport, err := utils.CheckAuditLines(stream, nil, auditv1.SchemeGroupVersion)
if err != nil {
t.Fatalf("Failed to check audit log: %v", err)
}
return missingReport.AllEvents
}
func (svm *svmTest) createCRD(
@@ -706,24 +735,27 @@ func (svm *svmTest) createCRD(
Plural: pluralName,
Singular: name,
},
Scope: apiextensionsv1.NamespaceScoped,
Versions: crdVersions,
Conversion: &apiextensionsv1.CustomResourceConversion{
Strategy: apiextensionsv1.WebhookConverter,
Webhook: &apiextensionsv1.WebhookConversion{
ClientConfig: &apiextensionsv1.WebhookClientConfig{
CABundle: certCtx.signingCert,
URL: ptr.To(
fmt.Sprintf("https://127.0.0.1:%d/%s", servicePort, webhookHandler),
),
},
ConversionReviewVersions: []string{"v1", "v2"},
},
},
Scope: apiextensionsv1.NamespaceScoped,
Versions: crdVersions,
PreserveUnknownFields: false,
},
}
if certCtx != nil {
crd.Spec.Conversion = &apiextensionsv1.CustomResourceConversion{
Strategy: apiextensionsv1.WebhookConverter,
Webhook: &apiextensionsv1.WebhookConversion{
ClientConfig: &apiextensionsv1.WebhookClientConfig{
CABundle: certCtx.signingCert,
URL: ptr.To(
fmt.Sprintf("https://127.0.0.1:%d/%s", servicePort, webhookHandler),
),
},
ConversionReviewVersions: []string{"v1", "v2"},
},
}
}
apiextensionsclient, err := apiextensionsclientset.NewForConfig(svm.clientConfig)
if err != nil {
t.Fatalf("Failed to create apiextensions client: %v", err)
@@ -809,7 +841,12 @@ func (svm *svmTest) waitForCRDUpdate(
}
}
func (svm *svmTest) createCR(ctx context.Context, t *testing.T, crName, version string) *unstructured.Unstructured {
type testingT interface {
Helper()
Fatalf(format string, args ...any)
}
func (svm *svmTest) createCR(ctx context.Context, t testingT, crName, version string) *unstructured.Unstructured {
t.Helper()
crdResource := schema.GroupVersionResource{
@@ -868,7 +905,7 @@ func (svm *svmTest) listCR(ctx context.Context, t *testing.T, version string) er
return err
}
func (svm *svmTest) deleteCR(ctx context.Context, t *testing.T, name, version string) {
func (svm *svmTest) deleteCR(ctx context.Context, t testingT, name, version string) {
t.Helper()
crdResource := schema.GroupVersionResource{
Group: crdGroup,
@@ -883,7 +920,9 @@ func (svm *svmTest) deleteCR(ctx context.Context, t *testing.T, name, version st
func (svm *svmTest) createConversionWebhook(ctx context.Context, t *testing.T, certCtx *certContext) context.CancelFunc {
t.Helper()
http.HandleFunc(fmt.Sprintf("/%s", webhookHandler), converter.ServeExampleConvert)
mux := http.NewServeMux()
mux.HandleFunc(fmt.Sprintf("/%s", webhookHandler), converter.ServeExampleConvert)
block, _ := pem.Decode(certCtx.key)
if block == nil {
@@ -904,7 +943,8 @@ func (svm *svmTest) createConversionWebhook(ctx context.Context, t *testing.T, c
}
server := &http.Server{
Addr: fmt.Sprintf("127.0.0.1:%d", servicePort),
Addr: fmt.Sprintf("127.0.0.1:%d", servicePort),
Handler: mux,
TLSConfig: &tls.Config{
Certificates: []tls.Certificate{
{
@@ -1030,29 +1070,53 @@ func (svm *svmTest) isCRStoredAtVersion(t *testing.T, version, crName string) bo
return obj.GetAPIVersion() == fmt.Sprintf("%s/%s", crdGroup, version)
}
func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName string) bool {
func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName, triggerCRName string) bool {
t.Helper()
var triggerOnce sync.Once
err := wait.PollUntilContextTimeout(
ctx,
500*time.Millisecond,
1*time.Minute,
5*time.Minute,
true,
func(ctx context.Context) (bool, error) {
triggerCR := svm.createCR(ctx, t, "triggercr", "v1")
svm.deleteCR(ctx, t, triggerCR.GetName(), "v1")
svmResource, err := svm.getSVM(ctx, t, crdSVMName)
if err != nil {
t.Fatalf("Failed to get SVM resource: %v", err)
}
if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationFailed) {
t.Logf("%q SVM has failed migration, %#v", crdSVMName, svmResource.Status.Conditions)
return false, fmt.Errorf("SVM has failed migration")
}
if svmResource.Status.ResourceVersion == "" {
t.Logf("%q SVM has no resourceVersion", crdSVMName)
return false, nil
}
if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationSucceeded) {
t.Logf("%q SVM has completed migration", crdSVMName)
return true, nil
}
if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationRunning) {
t.Logf("%q SVM migration is running, %#v", crdSVMName, svmResource.Status.Conditions)
return false, nil
}
t.Logf("%q SVM has not started migration, %#v", crdSVMName, svmResource.Status.Conditions)
// at this point we know that the RV has been set on the SVM resource,
// and we need to make sure that the GC list RV has caught up to that without waiting for a watch bookmark.
// we cannot trigger this any earlier as the rest mapper of the RV controller can be delayed
// and thus may not have observed the new CRD yet. we only need to do this once.
triggerOnce.Do(func() {
triggerCR := svm.createCR(ctx, t, triggerCRName, "v1")
svm.deleteCR(ctx, t, triggerCR.GetName(), "v1")
})
return false, nil
},
)
@@ -1065,7 +1129,7 @@ type versions struct {
isRVUpdated bool
}
func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, crVersions map[string]versions) {
func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, crVersions map[string]versions, getCRVersion string) {
t.Helper()
for crName, version := range crVersions {
@@ -1083,12 +1147,53 @@ func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, c
}
// validate resourceVersion and generation
crVersion := svm.getCR(ctx, t, crName, "v2").GetResourceVersion()
if version.isRVUpdated && crVersion == version.rv {
crVersion := svm.getCR(ctx, t, crName, getCRVersion).GetResourceVersion()
isRVUnchanged := crVersion == version.rv
if version.isRVUpdated && isRVUnchanged {
t.Fatalf("ResourceVersion of CR %s should not be equal. Expected: %s, Got: %s", crName, version.rv, crVersion)
}
if !version.isRVUpdated && !isRVUnchanged {
t.Fatalf("ResourceVersion of CR %s should be equal. Expected: %s, Got: %s", crName, version.rv, crVersion)
}
if obj.GetGeneration() != version.generation {
t.Fatalf("Generation of CR %s should be equal. Expected: %d, Got: %d", crName, version.generation, obj.GetGeneration())
}
}
}
func (svm *svmTest) createChaos(ctx context.Context, t *testing.T) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(ctx)
noFailT := ignoreFailures{} // these create and delete requests are not coordinated with the rest of the test and can fail
const workers = 10
wg.Add(workers)
for i := range workers {
i := i
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
_ = svm.createCR(ctx, noFailT, "chaos-cr-"+strconv.Itoa(i), "v1")
svm.deleteCR(ctx, noFailT, "chaos-cr-"+strconv.Itoa(i), "v1")
}
}()
}
t.Cleanup(func() {
cancel()
wg.Wait()
})
}
type ignoreFailures struct{}
func (ignoreFailures) Helper() {}
func (ignoreFailures) Fatalf(format string, args ...any) {}