Merge pull request #126107 from enj/enj/i/svm_not_found_err

svm: set UID and RV on SSA patch to cause conflict on logical create
This commit is contained in:
Kubernetes Prow Robot 2024-07-20 08:18:01 -07:00 committed by GitHub
commit 892acaa6a7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 413 additions and 189 deletions

View File

@ -54,7 +54,11 @@ func startSVMController(
return nil, true, fmt.Errorf("storage version migrator requires garbage collector")
}
// svm controller can make a lot of requests during migration, keep it fast
config := controllerContext.ClientBuilder.ConfigOrDie(controllerName)
config.QPS *= 20
config.Burst *= 100
client := controllerContext.ClientBuilder.ClientOrDie(controllerName)
informer := controllerContext.InformerFactory.Storagemigration().V1alpha1().StorageVersionMigrations()

View File

@ -199,7 +199,7 @@ func (rv *ResourceVersionController) sync(ctx context.Context, key string) error
StorageVersionMigrations().
UpdateStatus(
ctx,
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "resource does not exist in discovery"),
metav1.UpdateOptions{},
)
if err != nil {

View File

@ -204,27 +204,35 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error {
}
gvr := getGVRFromResource(toBeProcessedSVM)
resourceMonitor, err := svmc.dependencyGraphBuilder.GetMonitor(ctx, gvr)
// prevent unsynced monitor from blocking forever
// use a short timeout so that we can fail quickly and possibly handle other migrations while this monitor gets ready.
monCtx, monCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer monCtxCancel()
resourceMonitor, errMonitor := svmc.dependencyGraphBuilder.GetMonitor(monCtx, gvr)
if resourceMonitor != nil {
if err != nil {
if errMonitor != nil {
// non nil monitor indicates that error is due to resource not being synced
return fmt.Errorf("dependency graph is not synced, requeuing to attempt again")
return fmt.Errorf("dependency graph is not synced, requeuing to attempt again: %w", errMonitor)
}
} else {
logger.V(4).Error(errMonitor, "resource does not exist in GC", "gvr", gvr.String())
// our GC cache could be missing a recently created custom resource, so give it some time to catch up
// we resync discovery every 30 seconds so twice that should be sufficient
if toBeProcessedSVM.CreationTimestamp.Add(time.Minute).After(time.Now()) {
return fmt.Errorf("resource does not exist in GC, requeuing to attempt again: %w", errMonitor)
}
// we can't migrate a resource that doesn't exist in the GC
_, err = svmc.kubeClient.StoragemigrationV1alpha1().
_, errStatus := svmc.kubeClient.StoragemigrationV1alpha1().
StorageVersionMigrations().
UpdateStatus(
ctx,
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "resource not found"),
metav1.UpdateOptions{},
)
if err != nil {
return err
}
logger.V(4).Error(fmt.Errorf("error migrating the resource"), "resource does not exist in GC", "gvr", gvr.String())
return nil
return errStatus
}
gcListResourceVersion, err := convertResourceVersionToInt(resourceMonitor.Controller.LastSyncResourceVersion())
@ -244,7 +252,7 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error {
StorageVersionMigrations().
UpdateStatus(
ctx,
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationRunning, migrationRunningStatusReason),
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationRunning, migrationRunningStatusReason, ""),
metav1.UpdateOptions{},
)
if err != nil {
@ -255,60 +263,72 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error {
if err != nil {
return err
}
typeMeta := metav1.TypeMeta{}
// ToDo: implement a mechanism to resume migration from the last migrated resource in case of a failure
// process storage migration
for _, obj := range resourceMonitor.Store.List() {
accessor, err := meta.Accessor(obj)
if err != nil {
return err
}
typeMeta := typeMetaUIDRV{}
typeMeta.APIVersion, typeMeta.Kind = gvk.ToAPIVersionAndKind()
// set UID so that when a resource gets deleted, we get an "uid mismatch"
// conflict error instead of trying to create it.
typeMeta.UID = accessor.GetUID()
// set RV so that when a resources gets updated or deleted+recreated, we get an "object has been modified"
// conflict error. we do not actually need to do anything special for the updated case because if RV
// was not set, it would just result in no-op request. but for the deleted+recreated case, if RV is
// not set but UID is set, we would get an immutable field validation error. hence we must set both.
typeMeta.ResourceVersion = accessor.GetResourceVersion()
data, err := json.Marshal(typeMeta)
if err != nil {
return err
}
// ToDo: implement a mechanism to resume migration from the last migrated resource in case of a failure
// process storage migration
for _, gvrKey := range resourceMonitor.Store.ListKeys() {
namespace, name, err := cache.SplitMetaNamespaceKey(gvrKey)
if err != nil {
return err
}
_, err = svmc.dynamicClient.Resource(gvr).
Namespace(namespace).
_, errPatch := svmc.dynamicClient.Resource(gvr).
Namespace(accessor.GetNamespace()).
Patch(ctx,
name,
accessor.GetName(),
types.ApplyPatchType,
data,
metav1.PatchOptions{
FieldManager: svmc.controllerName,
},
)
if err != nil {
// in case of NotFound or Conflict, we can stop processing migration for that resource
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
// in case of conflict, we can stop processing migration for that resource because it has either been
// - updated, meaning that migration has already been performed
// - deleted, meaning that migration is not needed
// - deleted and recreated, meaning that migration has already been performed
if apierrors.IsConflict(errPatch) {
logger.V(6).Info("Resource ignored due to conflict", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String(), "err", errPatch)
continue
}
_, err = svmc.kubeClient.StoragemigrationV1alpha1().
if errPatch != nil {
logger.V(4).Error(errPatch, "Failed to migrate the resource", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String(), "reason", apierrors.ReasonForError(errPatch))
_, errStatus := svmc.kubeClient.StoragemigrationV1alpha1().
StorageVersionMigrations().
UpdateStatus(
ctx,
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "migration encountered unhandled error"),
metav1.UpdateOptions{},
)
if err != nil {
return err
}
logger.V(4).Error(err, "Failed to migrate the resource", "name", gvrKey, "gvr", gvr.String(), "reason", apierrors.ReasonForError(err))
return nil
return errStatus
// Todo: add retry for scenarios where API server returns rate limiting error
}
logger.V(4).Info("Successfully migrated the resource", "name", gvrKey, "gvr", gvr.String())
logger.V(4).Info("Successfully migrated the resource", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String())
}
_, err = svmc.kubeClient.StoragemigrationV1alpha1().
StorageVersionMigrations().
UpdateStatus(
ctx,
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded, migrationSuccessStatusReason),
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded, migrationSuccessStatusReason, ""),
metav1.UpdateOptions{},
)
if err != nil {
@ -318,3 +338,13 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error {
logger.V(4).Info("Finished syncing svm resource", "key", key, "gvr", gvr.String(), "elapsed", time.Since(startTime))
return nil
}
type typeMetaUIDRV struct {
metav1.TypeMeta `json:",inline"`
objectMetaUIDandRV `json:"metadata,omitempty"`
}
type objectMetaUIDandRV struct {
UID types.UID `json:"uid,omitempty"`
ResourceVersion string `json:"resourceVersion,omitempty"`
}

View File

@ -62,7 +62,7 @@ func indexOfCondition(svm *svmv1alpha1.StorageVersionMigration, conditionType sv
func setStatusConditions(
toBeUpdatedSVM *svmv1alpha1.StorageVersionMigration,
conditionType svmv1alpha1.MigrationConditionType,
reason string,
reason, message string,
) *svmv1alpha1.StorageVersionMigration {
if !IsConditionTrue(toBeUpdatedSVM, conditionType) {
if conditionType == svmv1alpha1.MigrationSucceeded || conditionType == svmv1alpha1.MigrationFailed {
@ -77,6 +77,7 @@ func setStatusConditions(
Status: corev1.ConditionTrue,
LastUpdateTime: metav1.Now(),
Reason: reason,
Message: message,
})
}

View File

@ -486,7 +486,10 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
Name: saRolePrefix + "storage-version-migrator-controller",
},
Rules: []rbacv1.PolicyRule{
rbacv1helpers.NewRule("list", "patch").Groups("*").Resources("*").RuleOrDie(),
// need list to get current RV for any resource
// need patch for SSA of any resource
// need create because SSA of a deleted resource will be interpreted as a create request, these always fail with a conflict error because UID is set
rbacv1helpers.NewRule("list", "create", "patch").Groups("*").Resources("*").RuleOrDie(),
rbacv1helpers.NewRule("update").Groups(storageVersionMigrationGroup).Resources("storageversionmigrations/status").RuleOrDie(),
},
})

View File

@ -17,6 +17,7 @@ limitations under the License.
package converter
import (
"bytes"
"fmt"
"io"
"net/http"
@ -131,7 +132,7 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) {
return
}
klog.V(2).Infof("handling request: %v", body)
klog.V(2).Infof("handling request: %s", string(body))
obj, gvk, err := serializer.Decode(body, nil, nil)
if err != nil {
msg := fmt.Sprintf("failed to deserialize body (%v) with error %v", string(body), err)
@ -152,7 +153,6 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) {
}
convertReview.Response = doConversionV1beta1(convertReview.Request, convert)
convertReview.Response.UID = convertReview.Request.UID
klog.V(2).Info(fmt.Sprintf("sending response: %v", convertReview.Response))
// reset the request, it is not needed in a response.
convertReview.Request = &v1beta1.ConversionRequest{}
@ -167,7 +167,6 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) {
}
convertReview.Response = doConversionV1(convertReview.Request, convert)
convertReview.Response.UID = convertReview.Request.UID
klog.V(2).Info(fmt.Sprintf("sending response: %v", convertReview.Response))
// reset the request, it is not needed in a response.
convertReview.Request = &v1.ConversionRequest{}
@ -187,12 +186,14 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) {
http.Error(w, msg, http.StatusBadRequest)
return
}
err = outSerializer.Encode(responseObj, w)
var buf bytes.Buffer
err = outSerializer.Encode(responseObj, io.MultiWriter(w, &buf))
if err != nil {
klog.Error(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
klog.V(2).Infof("sending response: %s", buf.String())
}
// ServeExampleConvert servers endpoint for the example converter defined as convertExampleCRD function.

View File

@ -19,20 +19,24 @@ package storageversionmigrator
import (
"bytes"
"context"
"strconv"
"sync"
"testing"
"time"
etcd3watcher "k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/klog/v2/ktesting"
"go.uber.org/goleak"
svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
etcd3watcher "k8s.io/apiserver/pkg/storage/etcd3"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientgofeaturegate "k8s.io/client-go/features"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
)
// TestStorageVersionMigration is an integration test that verifies storage version migration works.
@ -50,7 +54,7 @@ func TestStorageVersionMigration(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true)
// this makes the test super responsive. It's set to a default of 1 minute.
encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Millisecond
encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Second
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
@ -152,7 +156,12 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true)
// decode errors are expected when using conversation webhooks
etcd3watcher.TestOnlySetFatalOnDecodeError(false)
defer etcd3watcher.TestOnlySetFatalOnDecodeError(true)
t.Cleanup(func() { etcd3watcher.TestOnlySetFatalOnDecodeError(true) })
framework.GoleakCheck(t, // block test clean up and let any lingering watches complete before making decode errors fatal again
goleak.IgnoreTopFunction("k8s.io/kubernetes/vendor/gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("github.com/moby/spdystream.(*Connection).shutdown"),
)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
@ -163,6 +172,9 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) {
svmTest := svmSetup(ctx, t)
certCtx := svmTest.setupServerCert(t)
// simulate monkeys creating and deleting CRs
svmTest.createChaos(ctx, t)
// create CRD with v1 serving and storage
crd := svmTest.createCRD(t, crdName, crdGroup, certCtx, v1CRDVersion)
@ -238,7 +250,7 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create SVM resource: %v", err)
}
if ok := svmTest.isCRDMigrated(ctx, t, svm.Name); !ok {
if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, "triggercr"); !ok {
t.Fatalf("CRD not migrated")
}
@ -261,10 +273,73 @@ func TestStorageVersionMigrationWithCRD(t *testing.T) {
shutdownServer()
// assert RV and Generations of CRs
svmTest.validateRVAndGeneration(ctx, t, crVersions)
svmTest.validateRVAndGeneration(ctx, t, crVersions, "v2")
// assert v2 CRs can be listed
if err := svmTest.listCR(ctx, t, "v2"); err != nil {
t.Fatalf("Failed to list CRs at version v2: %v", err)
}
}
// TestStorageVersionMigrationDuringChaos serves as a stress test for the SVM controller.
// It creates a CRD and a reasonable number of static instances for that resource.
// It also continuously creates and deletes instances of that resource.
// During all of this, it attempts to perform multiple parallel migrations of the resource.
// It asserts that all migrations are successful and that none of the static instances
// were changed after they were initially created (as the migrations must be a no-op).
func TestStorageVersionMigrationDuringChaos(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionMigrator, true)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, featuregate.Feature(clientgofeaturegate.InformerResourceVersion), true)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
svmTest := svmSetup(ctx, t)
svmTest.createChaos(ctx, t)
crd := svmTest.createCRD(t, crdName, crdGroup, nil, v1CRDVersion)
crVersions := make(map[string]versions)
for i := range 50 { // a more realistic number of total resources
cr := svmTest.createCR(ctx, t, "created-cr-"+strconv.Itoa(i), "v1")
crVersions[cr.GetName()] = versions{
generation: cr.GetGeneration(),
rv: cr.GetResourceVersion(),
isRVUpdated: false, // none of these CRs should change due to migrations
}
}
var wg sync.WaitGroup
const migrations = 10 // more than the total workers of SVM
wg.Add(migrations)
for i := range migrations {
i := i
go func() {
defer wg.Done()
svm, err := svmTest.createSVMResource(
ctx, t, "chaos-svm-"+strconv.Itoa(i),
svmv1alpha1.GroupVersionResource{
Group: crd.Spec.Group,
Version: "v1",
Resource: crd.Spec.Names.Plural,
},
)
if err != nil {
t.Errorf("Failed to create SVM resource: %v", err)
return
}
triggerCRName := "chaos-trigger-" + strconv.Itoa(i)
if ok := svmTest.isCRDMigrated(ctx, t, svm.Name, triggerCRName); !ok {
t.Errorf("CRD not migrated")
return
}
}()
}
wg.Wait()
svmTest.validateRVAndGeneration(ctx, t, crVersions, "v1")
}

View File

@ -31,6 +31,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -45,33 +46,29 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
endpointsdiscovery "k8s.io/apiserver/pkg/endpoints/discovery"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/client-go/discovery"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/cert"
"k8s.io/client-go/util/keyutil"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/controller-manager/pkg/informerfactory"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
kubecontrollermanagertesting "k8s.io/kubernetes/cmd/kube-controller-manager/app/testing"
"k8s.io/kubernetes/pkg/controller/storageversionmigrator"
"k8s.io/kubernetes/test/images/agnhost/crd-conversion-webhook/converter"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/etcd"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils/kubeconfig"
utilnet "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
@ -107,6 +104,11 @@ rules:
- group: ""
resources: ["secrets"]
verbs: ["patch"]
- level: Metadata
resources:
- group: "stable.example.com"
resources: ["testcrds"]
users: ["system:serviceaccount:kube-system:storage-version-migrator-controller"]
`,
"initialEncryptionConfig": `
kind: EncryptionConfiguration
@ -272,81 +274,32 @@ func svmSetup(ctx context.Context, t *testing.T) *svmTest {
"--audit-log-version", "audit.k8s.io/v1",
"--audit-log-mode", "blocking",
"--audit-log-path", logFile.Name(),
"--authorization-mode=RBAC",
}
storageConfig := framework.SharedEtcd()
server := kubeapiservertesting.StartTestServerOrDie(t, nil, apiServerFlags, storageConfig)
kubeConfigFile := createKubeConfigFileForRestConfig(t, server.ClientConfig)
kcm := kubecontrollermanagertesting.StartTestServerOrDie(ctx, []string{
"--kubeconfig=" + kubeConfigFile,
"--controllers=garbagecollector,svm", // these are the only controllers needed for this test
"--use-service-account-credentials=true", // exercise RBAC of SVM controller
"--leader-elect=false", // KCM leader election calls os.Exit when it ends, so it is easier to just turn it off altogether
})
clientSet, err := clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("error in create clientset: %v", err)
}
discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery())
rvDiscoveryClient, err := discovery.NewDiscoveryClientForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("failed to create discovery client: %v", err)
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
restMapper.Reset()
metadataClient, err := metadata.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("failed to create metadataClient: %v", err)
}
dynamicClient, err := dynamic.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("error in create dynamic client: %v", err)
}
sharedInformers := informers.NewSharedInformerFactory(clientSet, 0)
metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := garbagecollector.NewGarbageCollector(
ctx,
clientSet,
metadataClient,
restMapper,
garbagecollector.DefaultIgnoredResources(),
informerfactory.NewInformerFactory(sharedInformers, metadataInformers),
alwaysStarted,
)
if err != nil {
t.Fatalf("error while creating garbage collector: %v", err)
}
startGC := func() {
syncPeriod := 5 * time.Second
go wait.Until(func() {
restMapper.Reset()
}, syncPeriod, ctx.Done())
go gc.Run(ctx, 1)
go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
}
svmController := storageversionmigrator.NewSVMController(
ctx,
clientSet,
dynamicClient,
sharedInformers.Storagemigration().V1alpha1().StorageVersionMigrations(),
names.StorageVersionMigratorController,
restMapper,
gc.GetDependencyGraphBuilder(),
)
rvController := storageversionmigrator.NewResourceVersionController(
ctx,
clientSet,
rvDiscoveryClient,
metadataClient,
sharedInformers.Storagemigration().V1alpha1().StorageVersionMigrations(),
restMapper,
)
// Start informer and controllers
sharedInformers.Start(ctx.Done())
startGC()
go svmController.Run(ctx)
go rvController.Run(ctx)
svmTest := &svmTest{
storageConfig: storageConfig,
@ -361,6 +314,19 @@ func svmSetup(ctx context.Context, t *testing.T) *svmTest {
}
t.Cleanup(func() {
var validCodes = sets.New[int32](http.StatusOK, http.StatusConflict) // make sure SVM controller never creates
_ = svmTest.countMatchingAuditEvents(t, func(event utils.AuditEvent) bool {
if event.User != "system:serviceaccount:kube-system:storage-version-migrator-controller" {
return false
}
if !validCodes.Has(event.Code) {
t.Errorf("svm controller had invalid response code for event: %#v", event)
return true
}
return false
})
kcm.TearDownFn()
server.TearDownFn()
utiltesting.CloseAndRemove(t, svmTest.logFile)
utiltesting.CloseAndRemove(t, svmTest.policyFile)
@ -373,6 +339,18 @@ func svmSetup(ctx context.Context, t *testing.T) *svmTest {
return svmTest
}
func createKubeConfigFileForRestConfig(t *testing.T, restConfig *rest.Config) string {
t.Helper()
clientConfig := kubeconfig.CreateKubeConfig(restConfig)
kubeConfigFile := filepath.Join(t.TempDir(), "kubeconfig.yaml")
if err := clientcmd.WriteToFile(*clientConfig, kubeConfigFile); err != nil {
t.Fatal(err)
}
return kubeConfigFile
}
func createEncryptionConfig(t *testing.T, encryptionConfig string) (
filePathForEncryptionConfig string,
err error,
@ -606,29 +584,48 @@ func (svm *svmTest) waitForResourceMigration(
) bool {
t.Helper()
var isMigrated bool
var triggerOnce sync.Once
err := wait.PollUntilContextTimeout(
ctx,
500*time.Millisecond,
wait.ForeverTestTimeout,
5*time.Minute,
true,
func(ctx context.Context) (bool, error) {
svmResource, err := svm.getSVM(ctx, t, svmName)
if err != nil {
t.Fatalf("Failed to get SVM resource: %v", err)
}
if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationFailed) {
t.Logf("%q SVM has failed migration, %#v", svmName, svmResource.Status.Conditions)
return false, fmt.Errorf("SVM has failed migration")
}
if svmResource.Status.ResourceVersion == "" {
t.Logf("%q SVM has no resourceVersion", svmName)
return false, nil
}
if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationSucceeded) {
isMigrated = true
t.Logf("%q SVM has completed migration", svmName)
return true, nil
}
if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationRunning) {
t.Logf("%q SVM migration is running, %#v", svmName, svmResource.Status.Conditions)
return false, nil
}
t.Logf("%q SVM has not started migration, %#v", svmName, svmResource.Status.Conditions)
// We utilize the LastSyncResourceVersion of the Garbage Collector (GC) to ensure that the cache is up-to-date before proceeding with the migration.
// However, in a quiet cluster, the GC may not be updated unless there is some activity or the watch receives a bookmark event after every 10 minutes.
// To expedite the update of the GC cache, we create a dummy secret and then promptly delete it.
// This action forces the GC to refresh its cache, enabling us to proceed with the migration.
// At this point we know that the RV has been set on the SVM resource, so the trigger will always have a higher RV.
// We only need to do this once.
triggerOnce.Do(func() {
_, err = svm.createSecret(ctx, t, triggerSecretName, defaultNamespace)
if err != nil {
t.Fatalf("Failed to create secret: %v", err)
@ -637,6 +634,66 @@ func (svm *svmTest) waitForResourceMigration(
if err != nil {
t.Fatalf("Failed to delete secret: %v", err)
}
})
return false, nil
},
)
if err != nil {
t.Logf("Failed to wait for resource migration for SVM %q with secret %q: %v", svmName, name, err)
return false
}
err = wait.PollUntilContextTimeout(
ctx,
500*time.Millisecond,
wait.ForeverTestTimeout,
true,
func(_ context.Context) (bool, error) {
want := utils.AuditEvent{
Level: auditinternal.LevelMetadata,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s?fieldManager=storage-version-migrator-controller", defaultNamespace, name),
Verb: "patch",
Code: http.StatusOK,
User: "system:serviceaccount:kube-system:storage-version-migrator-controller",
Resource: "secrets",
Namespace: "default",
AuthorizeDecision: "allow",
RequestObject: false,
ResponseObject: false,
}
if seen := svm.countMatchingAuditEvents(t, func(event utils.AuditEvent) bool { return reflect.DeepEqual(event, want) }); expectedEvents > seen {
t.Logf("audit log did not contain %d expected audit events, only has %d", expectedEvents, seen)
return false, nil
}
return true, nil
},
)
if err != nil {
t.Logf("Failed to wait for audit logs events for SVM %q with secret %q: %v", svmName, name, err)
return false
}
return true
}
func (svm *svmTest) countMatchingAuditEvents(t *testing.T, f func(utils.AuditEvent) bool) int {
t.Helper()
var seen int
for _, event := range svm.getAuditEvents(t) {
if f(event) {
seen++
}
}
return seen
}
func (svm *svmTest) getAuditEvents(t *testing.T) []utils.AuditEvent {
t.Helper()
stream, err := os.Open(svm.logFile.Name())
if err != nil {
@ -648,40 +705,12 @@ func (svm *svmTest) waitForResourceMigration(
}
}()
missingReport, err := utils.CheckAuditLines(
stream,
[]utils.AuditEvent{
{
Level: auditinternal.LevelMetadata,
Stage: auditinternal.StageResponseComplete,
RequestURI: fmt.Sprintf("/api/v1/namespaces/%s/secrets/%s?fieldManager=storage-version-migrator-controller", defaultNamespace, name),
Verb: "patch",
Code: 200,
User: "system:apiserver",
Resource: "secrets",
Namespace: "default",
AuthorizeDecision: "allow",
RequestObject: false,
ResponseObject: false,
},
},
auditv1.SchemeGroupVersion,
)
missingReport, err := utils.CheckAuditLines(stream, nil, auditv1.SchemeGroupVersion)
if err != nil {
t.Fatalf("Failed to check audit log: %v", err)
}
if (len(missingReport.MissingEvents) != 0) && (expectedEvents < missingReport.NumEventsChecked) {
isMigrated = false
}
return isMigrated, nil
},
)
if err != nil {
return false
}
return isMigrated
return missingReport.AllEvents
}
func (svm *svmTest) createCRD(
@ -708,7 +737,12 @@ func (svm *svmTest) createCRD(
},
Scope: apiextensionsv1.NamespaceScoped,
Versions: crdVersions,
Conversion: &apiextensionsv1.CustomResourceConversion{
PreserveUnknownFields: false,
},
}
if certCtx != nil {
crd.Spec.Conversion = &apiextensionsv1.CustomResourceConversion{
Strategy: apiextensionsv1.WebhookConverter,
Webhook: &apiextensionsv1.WebhookConversion{
ClientConfig: &apiextensionsv1.WebhookClientConfig{
@ -719,9 +753,7 @@ func (svm *svmTest) createCRD(
},
ConversionReviewVersions: []string{"v1", "v2"},
},
},
PreserveUnknownFields: false,
},
}
}
apiextensionsclient, err := apiextensionsclientset.NewForConfig(svm.clientConfig)
@ -809,7 +841,12 @@ func (svm *svmTest) waitForCRDUpdate(
}
}
func (svm *svmTest) createCR(ctx context.Context, t *testing.T, crName, version string) *unstructured.Unstructured {
type testingT interface {
Helper()
Fatalf(format string, args ...any)
}
func (svm *svmTest) createCR(ctx context.Context, t testingT, crName, version string) *unstructured.Unstructured {
t.Helper()
crdResource := schema.GroupVersionResource{
@ -868,7 +905,7 @@ func (svm *svmTest) listCR(ctx context.Context, t *testing.T, version string) er
return err
}
func (svm *svmTest) deleteCR(ctx context.Context, t *testing.T, name, version string) {
func (svm *svmTest) deleteCR(ctx context.Context, t testingT, name, version string) {
t.Helper()
crdResource := schema.GroupVersionResource{
Group: crdGroup,
@ -883,7 +920,9 @@ func (svm *svmTest) deleteCR(ctx context.Context, t *testing.T, name, version st
func (svm *svmTest) createConversionWebhook(ctx context.Context, t *testing.T, certCtx *certContext) context.CancelFunc {
t.Helper()
http.HandleFunc(fmt.Sprintf("/%s", webhookHandler), converter.ServeExampleConvert)
mux := http.NewServeMux()
mux.HandleFunc(fmt.Sprintf("/%s", webhookHandler), converter.ServeExampleConvert)
block, _ := pem.Decode(certCtx.key)
if block == nil {
@ -905,6 +944,7 @@ func (svm *svmTest) createConversionWebhook(ctx context.Context, t *testing.T, c
server := &http.Server{
Addr: fmt.Sprintf("127.0.0.1:%d", servicePort),
Handler: mux,
TLSConfig: &tls.Config{
Certificates: []tls.Certificate{
{
@ -1030,29 +1070,53 @@ func (svm *svmTest) isCRStoredAtVersion(t *testing.T, version, crName string) bo
return obj.GetAPIVersion() == fmt.Sprintf("%s/%s", crdGroup, version)
}
func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName string) bool {
func (svm *svmTest) isCRDMigrated(ctx context.Context, t *testing.T, crdSVMName, triggerCRName string) bool {
t.Helper()
var triggerOnce sync.Once
err := wait.PollUntilContextTimeout(
ctx,
500*time.Millisecond,
1*time.Minute,
5*time.Minute,
true,
func(ctx context.Context) (bool, error) {
triggerCR := svm.createCR(ctx, t, "triggercr", "v1")
svm.deleteCR(ctx, t, triggerCR.GetName(), "v1")
svmResource, err := svm.getSVM(ctx, t, crdSVMName)
if err != nil {
t.Fatalf("Failed to get SVM resource: %v", err)
}
if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationFailed) {
t.Logf("%q SVM has failed migration, %#v", crdSVMName, svmResource.Status.Conditions)
return false, fmt.Errorf("SVM has failed migration")
}
if svmResource.Status.ResourceVersion == "" {
t.Logf("%q SVM has no resourceVersion", crdSVMName)
return false, nil
}
if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationSucceeded) {
t.Logf("%q SVM has completed migration", crdSVMName)
return true, nil
}
if storageversionmigrator.IsConditionTrue(svmResource, svmv1alpha1.MigrationRunning) {
t.Logf("%q SVM migration is running, %#v", crdSVMName, svmResource.Status.Conditions)
return false, nil
}
t.Logf("%q SVM has not started migration, %#v", crdSVMName, svmResource.Status.Conditions)
// at this point we know that the RV has been set on the SVM resource,
// and we need to make sure that the GC list RV has caught up to that without waiting for a watch bookmark.
// we cannot trigger this any earlier as the rest mapper of the RV controller can be delayed
// and thus may not have observed the new CRD yet. we only need to do this once.
triggerOnce.Do(func() {
triggerCR := svm.createCR(ctx, t, triggerCRName, "v1")
svm.deleteCR(ctx, t, triggerCR.GetName(), "v1")
})
return false, nil
},
)
@ -1065,7 +1129,7 @@ type versions struct {
isRVUpdated bool
}
func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, crVersions map[string]versions) {
func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, crVersions map[string]versions, getCRVersion string) {
t.Helper()
for crName, version := range crVersions {
@ -1083,12 +1147,53 @@ func (svm *svmTest) validateRVAndGeneration(ctx context.Context, t *testing.T, c
}
// validate resourceVersion and generation
crVersion := svm.getCR(ctx, t, crName, "v2").GetResourceVersion()
if version.isRVUpdated && crVersion == version.rv {
crVersion := svm.getCR(ctx, t, crName, getCRVersion).GetResourceVersion()
isRVUnchanged := crVersion == version.rv
if version.isRVUpdated && isRVUnchanged {
t.Fatalf("ResourceVersion of CR %s should not be equal. Expected: %s, Got: %s", crName, version.rv, crVersion)
}
if !version.isRVUpdated && !isRVUnchanged {
t.Fatalf("ResourceVersion of CR %s should be equal. Expected: %s, Got: %s", crName, version.rv, crVersion)
}
if obj.GetGeneration() != version.generation {
t.Fatalf("Generation of CR %s should be equal. Expected: %d, Got: %d", crName, version.generation, obj.GetGeneration())
}
}
}
func (svm *svmTest) createChaos(ctx context.Context, t *testing.T) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(ctx)
noFailT := ignoreFailures{} // these create and delete requests are not coordinated with the rest of the test and can fail
const workers = 10
wg.Add(workers)
for i := range workers {
i := i
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
_ = svm.createCR(ctx, noFailT, "chaos-cr-"+strconv.Itoa(i), "v1")
svm.deleteCR(ctx, noFailT, "chaos-cr-"+strconv.Itoa(i), "v1")
}
}()
}
t.Cleanup(func() {
cancel()
wg.Wait()
})
}
type ignoreFailures struct{}
func (ignoreFailures) Helper() {}
func (ignoreFailures) Fatalf(format string, args ...any) {}

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/google/uuid"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
etcdserver "k8s.io/apiserver/pkg/storage/etcd3/testserver"
"k8s.io/apiserver/pkg/storage/storagebackend"
@ -69,6 +70,7 @@ func writeKubeConfigForWardleServerToKASConnection(t *testing.T, kubeClientConfi
// the loopback client config uses a loopback cert with different SNI. We need to use the "real"
// cert, so we'll hope we aren't hacked during a unit test and instead load it from the server we started.
wardleToKASKubeClientConfig := rest.CopyConfig(kubeClientConfig)
wardleToKASKubeClientConfig.ServerName = "" // reset SNI to use the "real" cert
servingCerts, _, err := cert.GetServingCertificatesForURL(wardleToKASKubeClientConfig.Host, "")
if err != nil {

View File

@ -66,6 +66,7 @@ type MissingEventsReport struct {
LastEventChecked *auditinternal.Event
NumEventsChecked int
MissingEvents []AuditEvent
AllEvents []AuditEvent
}
// String returns a human readable string representation of the report
@ -118,6 +119,7 @@ func CheckAuditLinesFiltered(stream io.Reader, expected []AuditEvent, version sc
}
expectations.Mark(event)
missingReport.AllEvents = append(missingReport.AllEvents, event)
}
if err := scanner.Err(); err != nil {
return missingReport, err

View File

@ -45,6 +45,7 @@ func CreateKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config {
cluster := clientcmdapi.NewCluster()
cluster.Server = clientCfg.Host
cluster.TLSServerName = clientCfg.ServerName
cluster.CertificateAuthority = clientCfg.CAFile
if len(cluster.CertificateAuthority) == 0 {
cluster.CertificateAuthorityData = clientCfg.CAData