mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #130276 from stlaz/svm_flakes
Fix SVM test flaking because of occasional slow resource storage update
This commit is contained in:
commit
08570c779b
@ -28,15 +28,17 @@ import (
|
||||
|
||||
svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
|
||||
etcd3watcher "k8s.io/apiserver/pkg/storage/etcd3"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientgofeaturegate "k8s.io/client-go/features"
|
||||
"k8s.io/component-base/featuregate"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
// TestStorageVersionMigration is an integration test that verifies storage version migration works.
|
||||
@ -56,9 +58,7 @@ func TestStorageVersionMigration(t *testing.T) {
|
||||
// this makes the test super responsive. It's set to a default of 1 minute.
|
||||
encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Second
|
||||
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
ctx := ktesting.Init(t)
|
||||
|
||||
svmTest := svmSetup(ctx, t)
|
||||
|
||||
@ -92,7 +92,7 @@ func TestStorageVersionMigration(t *testing.T) {
|
||||
}
|
||||
|
||||
wantPrefix := "k8s:enc:aescbc:v1:key2"
|
||||
etcdSecret, err := svmTest.getRawSecretFromETCD(t, secret.Name, secret.Namespace)
|
||||
etcdSecret, err := svmTest.getRawSecretFromETCD(t, secret.Namespace, secret.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get secret from etcd: %v", err)
|
||||
}
|
||||
@ -163,9 +163,7 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) {
|
||||
goleak.IgnoreTopFunction("github.com/moby/spdystream.(*Connection).shutdown"),
|
||||
)
|
||||
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
ctx := ktesting.Init(t)
|
||||
|
||||
crVersions := make(map[string]versions)
|
||||
|
||||
@ -210,10 +208,24 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) {
|
||||
svmTest.updateCRD(ctx, t, crd.Name, v2StorageCRDVersion, []string{"v1", "v2"}, "v2")
|
||||
|
||||
// create CR with v1
|
||||
cr3 := svmTest.createCR(ctx, t, "cr3", "v1")
|
||||
if ok := svmTest.isCRStoredAtVersion(t, "v2", cr3.GetName()); !ok {
|
||||
t.Fatalf("CR not stored at version v2")
|
||||
var cr3 *unstructured.Unstructured
|
||||
// updateCRD checks discovery returns storageVersionHash matching storage version v2
|
||||
// to make sure the API server uses v2 but CRD controllers may race and the resource
|
||||
// might still get stored in v1.
|
||||
// Attempt to recreate the CR until it gets stored as v2.
|
||||
// https://github.com/kubernetes/kubernetes/issues/130235
|
||||
err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(waitCtx context.Context) (done bool, err error) {
|
||||
cr3 = svmTest.createCR(waitCtx, t, "cr3", "v1")
|
||||
if ok := svmTest.isCRStoredAtVersion(t, "v2", cr3.GetName()); !ok {
|
||||
svmTest.deleteCR(waitCtx, t, cr3.GetName(), "v1")
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("timed out waiting for CR to be stored as v2: %v", err)
|
||||
}
|
||||
|
||||
crVersions[cr3.GetName()] = versions{
|
||||
generation: cr3.GetGeneration(),
|
||||
rv: cr3.GetResourceVersion(),
|
||||
@ -291,9 +303,7 @@ func TestStorageVersionMigrationDuringChaos(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionMigrator, true)
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true)
|
||||
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
t.Cleanup(cancel)
|
||||
ctx := ktesting.Init(t)
|
||||
|
||||
svmTest := svmSetup(ctx, t)
|
||||
|
||||
|
@ -388,9 +388,9 @@ func (svm *svmTest) createSecret(ctx context.Context, t *testing.T, name, namesp
|
||||
return svm.client.CoreV1().Secrets(secret.Namespace).Create(ctx, secret, metav1.CreateOptions{})
|
||||
}
|
||||
|
||||
func (svm *svmTest) getRawSecretFromETCD(t *testing.T, name, namespace string) ([]byte, error) {
|
||||
func (svm *svmTest) getRawSecretFromETCD(t *testing.T, namespace, name string) ([]byte, error) {
|
||||
t.Helper()
|
||||
secretETCDPath := svm.getETCDPathForResource(t, svm.storageConfig.Prefix, "", "secrets", name, namespace)
|
||||
secretETCDPath := getETCDPathForResource(t, svm.storageConfig.Prefix, "", "secrets", namespace, name)
|
||||
etcdResponse, err := svm.readRawRecordFromETCD(t, secretETCDPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read %s from etcd: %w", secretETCDPath, err)
|
||||
@ -398,7 +398,7 @@ func (svm *svmTest) getRawSecretFromETCD(t *testing.T, name, namespace string) (
|
||||
return etcdResponse.Kvs[0].Value, nil
|
||||
}
|
||||
|
||||
func (svm *svmTest) getETCDPathForResource(t *testing.T, storagePrefix, group, resource, name, namespaceName string) string {
|
||||
func getETCDPathForResource(t *testing.T, storagePrefix, group, resource, namespaceName, name string) string {
|
||||
t.Helper()
|
||||
groupResource := resource
|
||||
if group != "" {
|
||||
@ -432,9 +432,9 @@ func (svm *svmTest) readRawRecordFromETCD(t *testing.T, path string) (*clientv3.
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (svm *svmTest) getRawCRFromETCD(t *testing.T, name, namespace, crdGroup, crdName string) ([]byte, error) {
|
||||
func (svm *svmTest) getRawCRFromETCD(t *testing.T, crdGroup, crdName, namespace, name string) ([]byte, error) {
|
||||
t.Helper()
|
||||
crdETCDPath := svm.getETCDPathForResource(t, svm.storageConfig.Prefix, crdGroup, crdName, name, namespace)
|
||||
crdETCDPath := getETCDPathForResource(t, svm.storageConfig.Prefix, crdGroup, crdName, namespace, name)
|
||||
etcdResponse, err := svm.readRawRecordFromETCD(t, crdETCDPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read %s from etcd: %v", crdETCDPath, err)
|
||||
@ -808,29 +808,30 @@ func (svm *svmTest) waitForCRDUpdate(
|
||||
return false, fmt.Errorf("failed to get server groups and resources: %w", err)
|
||||
}
|
||||
for _, api := range apiGroups {
|
||||
if api.Name == crdGroup {
|
||||
var servingVersions []string
|
||||
for _, apiVersion := range api.Versions {
|
||||
servingVersions = append(servingVersions, apiVersion.Version)
|
||||
}
|
||||
sort.Strings(servingVersions)
|
||||
if api.Name != crdGroup {
|
||||
continue
|
||||
}
|
||||
var servingVersions []string
|
||||
for _, apiVersion := range api.Versions {
|
||||
servingVersions = append(servingVersions, apiVersion.Version)
|
||||
}
|
||||
sort.Strings(servingVersions)
|
||||
|
||||
// Check if the serving versions are as expected
|
||||
if reflect.DeepEqual(expectedServingVersions, servingVersions) {
|
||||
expectedHash := endpointsdiscovery.StorageVersionHash(crdGroup, expectedStorageVersion, crdKind)
|
||||
resourceList, err := svm.discoveryClient.ServerResourcesForGroupVersion(crdGroup + "/" + api.PreferredVersion.Version)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get server resources for group version: %w", err)
|
||||
}
|
||||
// Check if the serving versions are as expected
|
||||
if !reflect.DeepEqual(expectedServingVersions, servingVersions) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the storage version is as expected
|
||||
for _, resource := range resourceList.APIResources {
|
||||
if resource.Kind == crdKind {
|
||||
if resource.StorageVersionHash == expectedHash {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
expectedHash := endpointsdiscovery.StorageVersionHash(crdGroup, expectedStorageVersion, crdKind)
|
||||
resourceList, err := svm.discoveryClient.ServerResourcesForGroupVersion(crdGroup + "/" + api.PreferredVersion.Version)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get server resources for group version: %w", err)
|
||||
}
|
||||
|
||||
// Check if the storage version is as expected
|
||||
for _, resource := range resourceList.APIResources {
|
||||
if resource.Kind == crdKind && resource.StorageVersionHash == expectedHash {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1056,7 +1057,7 @@ func (svm *svmTest) setupServerCert(t *testing.T) *certContext {
|
||||
func (svm *svmTest) isCRStoredAtVersion(t *testing.T, version, crName string) bool {
|
||||
t.Helper()
|
||||
|
||||
data, err := svm.getRawCRFromETCD(t, crName, defaultNamespace, crdGroup, crdName+"s")
|
||||
data, err := svm.getRawCRFromETCD(t, crdGroup, crdName+"s", defaultNamespace, crName)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get CR from etcd: %v", err)
|
||||
}
|
||||
@ -1135,7 +1136,7 @@ func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, c
|
||||
|
||||
for crName, version := range crVersions {
|
||||
// get CR from etcd
|
||||
data, err := svm.getRawCRFromETCD(t, crName, defaultNamespace, crdGroup, crdName+"s")
|
||||
data, err := svm.getRawCRFromETCD(t, crdGroup, crdName+"s", defaultNamespace, crName)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get CR from etcd: %v", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user