From e9e4acb1dde69243a6e675e58833ae7936df9ce5 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 2 Jun 2023 20:25:31 +0200 Subject: [PATCH] k8s.io/apiserver: remove skewed completion from EtcdOptions --- cmd/kube-apiserver/app/apiextensions.go | 6 +- cmd/kube-apiserver/app/server.go | 3 - pkg/controlplane/instance_test.go | 3 - .../pkg/cmd/server/options/options.go | 37 +--- .../integration/conversion/conversion_test.go | 5 +- .../test/integration/defaulting_test.go | 5 +- .../test/integration/fixtures/server.go | 13 +- .../test/integration/objectmeta_test.go | 5 +- .../src/k8s.io/apiserver/pkg/server/config.go | 3 + .../server/options/encryptionconfig/config.go | 104 +++++----- .../apiserver/pkg/server/options/etcd.go | 178 ++++++++---------- .../apiserver/pkg/server/options/etcd_test.go | 6 - .../pkg/server/options/recommended.go | 3 - .../pkg/storage/value/transformer.go | 6 + 14 files changed, 166 insertions(+), 211 deletions(-) diff --git a/cmd/kube-apiserver/app/apiextensions.go b/cmd/kube-apiserver/app/apiextensions.go index ee80cd9a7c9..3368580fa1c 100644 --- a/cmd/kube-apiserver/app/apiextensions.go +++ b/cmd/kube-apiserver/app/apiextensions.go @@ -71,17 +71,13 @@ func createAPIExtensionsConfig( apiextensionsapiserver.Scheme); err != nil { return nil, err } - crdRESTOptionsGetter, err := apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions) - if err != nil { - return nil, err - } apiextensionsConfig := &apiextensionsapiserver.Config{ GenericConfig: &genericapiserver.RecommendedConfig{ Config: genericConfig, SharedInformerFactory: externalInformers, }, ExtraConfig: apiextensionsapiserver.ExtraConfig{ - CRDRESTOptionsGetter: crdRESTOptionsGetter, + CRDRESTOptionsGetter: apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions, genericConfig.ResourceTransformers, genericConfig.StorageObjectCountTracker), MasterCount: masterCount, AuthResolverWrapper: authResolverWrapper, ServiceResolver: serviceResolver, diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 634722127f5..f76983de8be 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -402,9 +402,6 @@ func buildGenericConfig( } else { s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider() } - if lastErr = s.Etcd.Complete(genericConfig.DrainedNotify(), genericConfig.AddPostStartHook); lastErr != nil { - return - } storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig() storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig diff --git a/pkg/controlplane/instance_test.go b/pkg/controlplane/instance_test.go index 1655b92b5ee..21d79a54b28 100644 --- a/pkg/controlplane/instance_test.go +++ b/pkg/controlplane/instance_test.go @@ -94,9 +94,6 @@ func setUp(t *testing.T) (*etcd3testing.EtcdTestServer, Config, *assert.Assertio etcdOptions := options.NewEtcdOptions(storageConfig) // unit tests don't need watch cache and it leaks lots of goroutines with etcd testing functions during unit tests etcdOptions.EnableWatchCache = false - if err := etcdOptions.Complete(config.GenericConfig.DrainedNotify(), config.GenericConfig.AddPostStartHook); err != nil { - t.Fatal(err) - } err := etcdOptions.ApplyWithStorageFactoryTo(storageFactory, config.GenericConfig) if err != nil { t.Fatal(err) diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go index b208363ffff..ef77ee42e91 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go @@ -21,7 +21,6 @@ import ( "io" "net" "net/url" - "reflect" "github.com/spf13/pflag" oteltrace "go.opentelemetry.io/otel/trace" @@ -36,6 +35,8 @@ import ( genericregistry "k8s.io/apiserver/pkg/registry/generic" genericapiserver "k8s.io/apiserver/pkg/server" genericoptions "k8s.io/apiserver/pkg/server/options" + storagevalue "k8s.io/apiserver/pkg/storage/value" + flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/apiserver/pkg/util/openapi" "k8s.io/apiserver/pkg/util/proxy" "k8s.io/apiserver/pkg/util/webhook" @@ -111,16 +112,12 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err if err := o.APIEnablement.ApplyTo(&serverConfig.Config, apiserver.DefaultAPIResourceConfigSource(), apiserver.Scheme); err != nil { return nil, err } - crdRESTOptionsGetter, err := NewCRDRESTOptionsGetter(*o.RecommendedOptions.Etcd) - if err != nil { - return nil, err - } serverConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions), openapinamer.NewDefinitionNamer(apiserver.Scheme, scheme.Scheme)) config := &apiserver.Config{ GenericConfig: serverConfig, ExtraConfig: apiserver.ExtraConfig{ - CRDRESTOptionsGetter: crdRESTOptionsGetter, + CRDRESTOptionsGetter: NewCRDRESTOptionsGetter(*o.RecommendedOptions.Etcd, serverConfig.ResourceTransformers, serverConfig.StorageObjectCountTracker), ServiceResolver: &serviceResolver{serverConfig.SharedInformerFactory.Core().V1().Services().Lister()}, AuthResolverWrapper: webhook.NewDefaultAuthenticationInfoResolverWrapper(nil, nil, serverConfig.LoopbackClientConfig, oteltrace.NewNoopTracerProvider()), }, @@ -129,30 +126,16 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err } // NewCRDRESTOptionsGetter create a RESTOptionsGetter for CustomResources. -// This works on a copy of the etcd options so we don't mutate originals. -// We assume that the input etcd options have been completed already. +// // Avoid messing with anything outside of changes to StorageConfig as that // may lead to unexpected behavior when the options are applied. -func NewCRDRESTOptionsGetter(etcdOptions genericoptions.EtcdOptions) (genericregistry.RESTOptionsGetter, error) { - etcdOptions.StorageConfig.Codec = unstructured.UnstructuredJSONScheme - etcdOptions.WatchCacheSizes = nil // this control is not provided for custom resources - etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks +func NewCRDRESTOptionsGetter(etcdOptions genericoptions.EtcdOptions, resourceTransformers storagevalue.ResourceTransformers, tracker flowcontrolrequest.StorageObjectCountTracker) genericregistry.RESTOptionsGetter { + etcdOptionsCopy := etcdOptions + etcdOptionsCopy.StorageConfig.Codec = unstructured.UnstructuredJSONScheme + etcdOptionsCopy.StorageConfig.StorageObjectCountTracker = tracker + etcdOptionsCopy.WatchCacheSizes = nil // this control is not provided for custom resources - // creates a generic apiserver config for etcdOptions to mutate - c := genericapiserver.Config{} - if err := etcdOptions.ApplyTo(&c); err != nil { - return nil, err - } - restOptionsGetter := c.RESTOptionsGetter - if restOptionsGetter == nil { - return nil, fmt.Errorf("server.Config RESTOptionsGetter should not be nil") - } - // sanity check that no other fields are set - c.RESTOptionsGetter = nil - if !reflect.DeepEqual(c, genericapiserver.Config{}) { - return nil, fmt.Errorf("only RESTOptionsGetter should have been mutated in server.Config") - } - return restOptionsGetter, nil + return etcdOptions.CreateRESTOptionsGetter(&genericoptions.SimpleStorageFactory{StorageConfig: etcdOptionsCopy.StorageConfig}, resourceTransformers) } type serviceResolver struct { diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/conversion/conversion_test.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/conversion/conversion_test.go index c93cbe99cc5..2107cc35f34 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/conversion/conversion_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/conversion/conversion_test.go @@ -182,10 +182,7 @@ func testWebhookConverter(t *testing.T, watchCache bool) { crd := multiVersionFixture.DeepCopy() - RESTOptionsGetter, err := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd) - if err != nil { - t.Fatal(err) - } + RESTOptionsGetter := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd, nil, nil) restOptions, err := RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural}) if err != nil { t.Fatal(err) diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/defaulting_test.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/defaulting_test.go index 2554a6ef9de..dcfdf2f7525 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/defaulting_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/defaulting_test.go @@ -658,10 +658,7 @@ func TestCustomResourceDefaultingOfMetaFields(t *testing.T) { t.Logf("CR created: %#v", returnedFoo.UnstructuredContent()) // get persisted object - RESTOptionsGetter, err := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd) - if err != nil { - t.Fatal(err) - } + RESTOptionsGetter := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd, nil, nil) restOptions, err := RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural}) if err != nil { t.Fatal(err) diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures/server.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures/server.go index 486e7a90d45..fb35029fc3e 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures/server.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures/server.go @@ -31,6 +31,8 @@ import ( serveroptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options" servertesting "k8s.io/apiextensions-apiserver/pkg/cmd/server/testing" "k8s.io/apimachinery/pkg/runtime/schema" + genericapiserver "k8s.io/apiserver/pkg/server" + storagevalue "k8s.io/apiserver/pkg/storage/value" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" ) @@ -145,10 +147,15 @@ func StartDefaultServerWithClientsAndEtcd(t servertesting.Logger, extraFlags ... return nil, nil, nil, nil, "", err } - RESTOptionsGetter, err := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd) - if err != nil { - return nil, nil, nil, nil, "", err + var resourceTransformers storagevalue.ResourceTransformers + if len(options.RecommendedOptions.Etcd.EncryptionProviderConfigFilepath) != 0 { + // be clever in tests to reconstruct the transformers, for encryption integration tests + config := genericapiserver.Config{} + options.RecommendedOptions.Etcd.ApplyTo(&config) + resourceTransformers = config.ResourceTransformers } + + RESTOptionsGetter := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd, resourceTransformers, nil) restOptions, err := RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: "hopefully-ignored-group", Resource: "hopefully-ignored-resources"}) if err != nil { return nil, nil, nil, nil, "", err diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/objectmeta_test.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/objectmeta_test.go index b5745bed031..9ac7387d7ab 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/objectmeta_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/objectmeta_test.go @@ -132,10 +132,7 @@ func TestInvalidObjectMetaInStorage(t *testing.T) { t.Fatal(err) } - RESTOptionsGetter, err := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd) - if err != nil { - t.Fatal(err) - } + RESTOptionsGetter := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd, nil, nil) restOptions, err := RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: noxuDefinition.Spec.Group, Resource: noxuDefinition.Spec.Names.Plural}) if err != nil { t.Fatal(err) diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 9dc87506a40..bb11f22ec67 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -65,6 +65,7 @@ import ( "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/routes" serverstore "k8s.io/apiserver/pkg/server/storage" + storagevalue "k8s.io/apiserver/pkg/storage/value" "k8s.io/apiserver/pkg/storageversion" utilfeature "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" @@ -190,6 +191,8 @@ type Config struct { // SkipOpenAPIInstallation avoids installing the OpenAPI handler if set to true. SkipOpenAPIInstallation bool + // ResourceTransformers are used to transform resources from and to etcd, e.g. encryption. + ResourceTransformers storagevalue.ResourceTransformers // RESTOptionsGetter is used to construct RESTStorage types via the generic registry. RESTOptionsGetter genericregistry.RESTOptionsGetter diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go index 796cc6b03dc..8fb0ae76825 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go @@ -43,7 +43,7 @@ import ( "k8s.io/apiserver/pkg/apis/config/validation" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/server/healthz" - "k8s.io/apiserver/pkg/storage/value" + storagevalue "k8s.io/apiserver/pkg/storage/value" aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes" "k8s.io/apiserver/pkg/storage/value/encrypt/envelope" envelopekmsv2 "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2" @@ -159,7 +159,7 @@ func (h *kmsv2PluginProbe) toHealthzCheck(idx int) healthz.HealthChecker { // EncryptionConfiguration represents the parsed and normalized encryption configuration for the apiserver. type EncryptionConfiguration struct { // Transformers is a list of value.Transformer that will be used to encrypt and decrypt data. - Transformers map[schema.GroupResource]value.Transformer + Transformers map[schema.GroupResource]storagevalue.Transformer // HealthChecks is a list of healthz.HealthChecker that will be used to check the health of the encryption providers. HealthChecks []healthz.HealthChecker @@ -207,7 +207,7 @@ func LoadEncryptionConfig(ctx context.Context, filepath string, reload bool) (*E // getTransformerOverridesAndKMSPluginHealthzCheckers creates the set of transformers and KMS healthz checks based on the given config. // It may launch multiple go routines whose lifecycle is controlled by ctx. // In case of an error, the caller is responsible for canceling ctx to clean up any go routines that may have been launched. -func getTransformerOverridesAndKMSPluginHealthzCheckers(ctx context.Context, config *apiserverconfig.EncryptionConfiguration) (map[schema.GroupResource]value.Transformer, []healthz.HealthChecker, *kmsState, error) { +func getTransformerOverridesAndKMSPluginHealthzCheckers(ctx context.Context, config *apiserverconfig.EncryptionConfiguration) (map[schema.GroupResource]storagevalue.Transformer, []healthz.HealthChecker, *kmsState, error) { var kmsHealthChecks []healthz.HealthChecker transformers, probes, kmsUsed, err := getTransformerOverridesAndKMSPluginProbes(ctx, config) if err != nil { @@ -228,8 +228,8 @@ type healthChecker interface { // getTransformerOverridesAndKMSPluginProbes creates the set of transformers and KMS probes based on the given config. // It may launch multiple go routines whose lifecycle is controlled by ctx. // In case of an error, the caller is responsible for canceling ctx to clean up any go routines that may have been launched. -func getTransformerOverridesAndKMSPluginProbes(ctx context.Context, config *apiserverconfig.EncryptionConfiguration) (map[schema.GroupResource]value.Transformer, []healthChecker, *kmsState, error) { - resourceToPrefixTransformer := map[schema.GroupResource][]value.PrefixTransformer{} +func getTransformerOverridesAndKMSPluginProbes(ctx context.Context, config *apiserverconfig.EncryptionConfiguration) (map[schema.GroupResource]storagevalue.Transformer, []healthChecker, *kmsState, error) { + resourceToPrefixTransformer := map[schema.GroupResource][]storagevalue.PrefixTransformer{} var probes []healthChecker var kmsUsed kmsState @@ -268,11 +268,11 @@ func getTransformerOverridesAndKMSPluginProbes(ctx context.Context, config *apis probes = append(probes, p...) } - transformers := make(map[schema.GroupResource]value.Transformer, len(resourceToPrefixTransformer)) + transformers := make(map[schema.GroupResource]storagevalue.Transformer, len(resourceToPrefixTransformer)) for gr, transList := range resourceToPrefixTransformer { gr := gr transList := transList - transformers[gr] = value.NewPrefixTransformers(fmt.Errorf("no matching prefix found"), transList...) + transformers[gr] = storagevalue.NewPrefixTransformers(fmt.Errorf("no matching prefix found"), transList...) } return transformers, probes, &kmsUsed, nil @@ -478,15 +478,15 @@ func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfig // prefixTransformersAndProbes creates the set of transformers and KMS probes based on the given resource config. // It may launch multiple go routines whose lifecycle is controlled by ctx. // In case of an error, the caller is responsible for canceling ctx to clean up any go routines that may have been launched. -func prefixTransformersAndProbes(ctx context.Context, config apiserverconfig.ResourceConfiguration) ([]value.PrefixTransformer, []healthChecker, *kmsState, error) { - var transformers []value.PrefixTransformer +func prefixTransformersAndProbes(ctx context.Context, config apiserverconfig.ResourceConfiguration) ([]storagevalue.PrefixTransformer, []healthChecker, *kmsState, error) { + var transformers []storagevalue.PrefixTransformer var probes []healthChecker var kmsUsed kmsState for _, provider := range config.Providers { provider := provider var ( - transformer value.PrefixTransformer + transformer storagevalue.PrefixTransformer transformerErr error probe healthChecker used *kmsState @@ -497,7 +497,7 @@ func prefixTransformersAndProbes(ctx context.Context, config apiserverconfig.Res transformer, transformerErr = aesPrefixTransformer(provider.AESGCM, aestransformer.NewGCMTransformer, aesGCMTransformerPrefixV1) case provider.AESCBC != nil: - cbcTransformer := func(block cipher.Block) (value.Transformer, error) { + cbcTransformer := func(block cipher.Block) (storagevalue.Transformer, error) { return aestransformer.NewCBCTransformer(block), nil } transformer, transformerErr = aesPrefixTransformer(provider.AESCBC, cbcTransformer, aesCBCTransformerPrefixV1) @@ -513,7 +513,7 @@ func prefixTransformersAndProbes(ctx context.Context, config apiserverconfig.Res } case provider.Identity != nil: - transformer = value.PrefixTransformer{ + transformer = storagevalue.PrefixTransformer{ Transformer: identity.NewEncryptCheckTransformer(), Prefix: []byte{}, } @@ -532,10 +532,10 @@ func prefixTransformersAndProbes(ctx context.Context, config apiserverconfig.Res return transformers, probes, &kmsUsed, nil } -type blockTransformerFunc func(cipher.Block) (value.Transformer, error) +type blockTransformerFunc func(cipher.Block) (storagevalue.Transformer, error) -func aesPrefixTransformer(config *apiserverconfig.AESConfiguration, fn blockTransformerFunc, prefix string) (value.PrefixTransformer, error) { - var result value.PrefixTransformer +func aesPrefixTransformer(config *apiserverconfig.AESConfiguration, fn blockTransformerFunc, prefix string) (storagevalue.PrefixTransformer, error) { + var result storagevalue.PrefixTransformer if len(config.Keys) == 0 { return result, fmt.Errorf("aes provider has no valid keys") @@ -550,7 +550,7 @@ func aesPrefixTransformer(config *apiserverconfig.AESConfiguration, fn blockTran } } - keyTransformers := []value.PrefixTransformer{} + keyTransformers := []storagevalue.PrefixTransformer{} for _, keyData := range config.Keys { keyData := keyData @@ -569,26 +569,26 @@ func aesPrefixTransformer(config *apiserverconfig.AESConfiguration, fn blockTran // Create a new PrefixTransformer for this key keyTransformers = append(keyTransformers, - value.PrefixTransformer{ + storagevalue.PrefixTransformer{ Transformer: transformer, Prefix: []byte(keyData.Name + ":"), }) } // Create a prefixTransformer which can choose between these keys - keyTransformer := value.NewPrefixTransformers( + keyTransformer := storagevalue.NewPrefixTransformers( fmt.Errorf("no matching key was found for the provided AES transformer"), keyTransformers...) // Create a PrefixTransformer which shall later be put in a list with other providers - result = value.PrefixTransformer{ + result = storagevalue.PrefixTransformer{ Transformer: keyTransformer, Prefix: []byte(prefix), } return result, nil } -func secretboxPrefixTransformer(config *apiserverconfig.SecretboxConfiguration) (value.PrefixTransformer, error) { - var result value.PrefixTransformer +func secretboxPrefixTransformer(config *apiserverconfig.SecretboxConfiguration) (storagevalue.PrefixTransformer, error) { + var result storagevalue.PrefixTransformer if len(config.Keys) == 0 { return result, fmt.Errorf("secretbox provider has no valid keys") @@ -603,7 +603,7 @@ func secretboxPrefixTransformer(config *apiserverconfig.SecretboxConfiguration) } } - keyTransformers := []value.PrefixTransformer{} + keyTransformers := []storagevalue.PrefixTransformer{} for _, keyData := range config.Keys { keyData := keyData @@ -621,18 +621,18 @@ func secretboxPrefixTransformer(config *apiserverconfig.SecretboxConfiguration) // Create a new PrefixTransformer for this key keyTransformers = append(keyTransformers, - value.PrefixTransformer{ + storagevalue.PrefixTransformer{ Transformer: secretbox.NewSecretboxTransformer(keyArray), Prefix: []byte(keyData.Name + ":"), }) } // Create a prefixTransformer which can choose between these keys - keyTransformer := value.NewPrefixTransformers( + keyTransformer := storagevalue.NewPrefixTransformers( fmt.Errorf("no matching key was found for the provided Secretbox transformer"), keyTransformers...) // Create a PrefixTransformer which shall later be put in a list with other providers - result = value.PrefixTransformer{ + result = storagevalue.PrefixTransformer{ Transformer: keyTransformer, Prefix: []byte(secretboxTransformerPrefixV1), } @@ -665,13 +665,13 @@ func (s *kmsState) accumulate(other *kmsState) { // kmsPrefixTransformer creates a KMS transformer and probe based on the given KMS config. // It may launch multiple go routines whose lifecycle is controlled by ctx. // In case of an error, the caller is responsible for canceling ctx to clean up any go routines that may have been launched. -func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfiguration) (value.PrefixTransformer, healthChecker, *kmsState, error) { +func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfiguration) (storagevalue.PrefixTransformer, healthChecker, *kmsState, error) { kmsName := config.Name switch config.APIVersion { case kmsAPIVersionV1: envelopeService, err := envelopeServiceFactory(ctx, config.Endpoint, config.Timeout.Duration) if err != nil { - return value.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMSv1-Plugin's probe %q, error: %w", kmsName, err) + return storagevalue.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMSv1-Plugin's probe %q, error: %w", kmsName, err) } probe := &kmsPluginProbe{ @@ -692,12 +692,12 @@ func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfig case kmsAPIVersionV2: if !utilfeature.DefaultFeatureGate.Enabled(features.KMSv2) { - return value.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMSv2 plugin %q, KMSv2 feature is not enabled", kmsName) + return storagevalue.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMSv2 plugin %q, KMSv2 feature is not enabled", kmsName) } envelopeService, err := EnvelopeKMSv2ServiceFactory(ctx, config.Endpoint, config.Name, config.Timeout.Duration) if err != nil { - return value.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, error: %w", kmsName, err) + return storagevalue.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, error: %w", kmsName, err) } probe := &kmsv2PluginProbe{ @@ -748,7 +748,7 @@ func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfig }) // using AES-GCM by default for encrypting data with KMSv2 - transformer := value.PrefixTransformer{ + transformer := storagevalue.PrefixTransformer{ Transformer: envelopekmsv2.NewEnvelopeTransformer(envelopeService, kmsName, probe.getCurrentState), Prefix: []byte(kmsTransformerPrefixV2 + kmsName + ":"), } @@ -759,12 +759,12 @@ func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfig }, nil default: - return value.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMS plugin %q, unsupported KMS API version %q", kmsName, config.APIVersion) + return storagevalue.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMS plugin %q, unsupported KMS API version %q", kmsName, config.APIVersion) } } -func envelopePrefixTransformer(config *apiserverconfig.KMSConfiguration, envelopeService envelope.Service, prefix string) value.PrefixTransformer { - baseTransformerFunc := func(block cipher.Block) (value.Transformer, error) { +func envelopePrefixTransformer(config *apiserverconfig.KMSConfiguration, envelopeService envelope.Service, prefix string) storagevalue.PrefixTransformer { + baseTransformerFunc := func(block cipher.Block) (storagevalue.Transformer, error) { gcm, err := aestransformer.NewGCMTransformer(block) if err != nil { return nil, err @@ -777,15 +777,15 @@ func envelopePrefixTransformer(config *apiserverconfig.KMSConfiguration, envelop return unionTransformers{gcm, aestransformer.NewCBCTransformer(block)}, nil } - return value.PrefixTransformer{ + return storagevalue.PrefixTransformer{ Transformer: envelope.NewEnvelopeTransformer(envelopeService, int(*config.CacheSize), baseTransformerFunc), Prefix: []byte(prefix + config.Name + ":"), } } -type unionTransformers []value.Transformer +type unionTransformers []storagevalue.Transformer -func (u unionTransformers) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) (out []byte, stale bool, err error) { +func (u unionTransformers) TransformFromStorage(ctx context.Context, data []byte, dataCtx storagevalue.Context) (out []byte, stale bool, err error) { var errs []error for i := range u { transformer := u[i] @@ -804,7 +804,7 @@ func (u unionTransformers) TransformFromStorage(ctx context.Context, data []byte return nil, false, fmt.Errorf("unionTransformers: unable to transform from storage") } -func (u unionTransformers) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) (out []byte, err error) { +func (u unionTransformers) TransformToStorage(ctx context.Context, data []byte, dataCtx storagevalue.Context) (out []byte, err error) { return u[0].TransformToStorage(ctx, data, dataCtx) } @@ -815,7 +815,7 @@ func computeEncryptionConfigHash(data []byte) string { return fmt.Sprintf("%x", sha256.Sum256(data)) } -var _ ResourceTransformers = &DynamicTransformers{} +var _ storagevalue.ResourceTransformers = &DynamicTransformers{} var _ healthz.HealthChecker = &DynamicTransformers{} // DynamicTransformers holds transformers that may be dynamically updated via a single external actor, likely a controller. @@ -825,7 +825,7 @@ type DynamicTransformers struct { } type transformTracker struct { - transformerOverrides map[schema.GroupResource]value.Transformer + transformerOverrides map[schema.GroupResource]storagevalue.Transformer kmsPluginHealthzCheck healthz.HealthChecker closeTransformers context.CancelFunc kmsCloseGracePeriod time.Duration @@ -833,7 +833,7 @@ type transformTracker struct { // NewDynamicTransformers returns transformers, health checks for kms providers and an ability to close transformers. func NewDynamicTransformers( - transformerOverrides map[schema.GroupResource]value.Transformer, + transformerOverrides map[schema.GroupResource]storagevalue.Transformer, kmsPluginHealthzCheck healthz.HealthChecker, closeTransformers context.CancelFunc, kmsCloseGracePeriod time.Duration, @@ -864,7 +864,7 @@ func (d *DynamicTransformers) Name() string { } // TransformerForResource returns the transformer for the given resource. -func (d *DynamicTransformers) TransformerForResource(resource schema.GroupResource) value.Transformer { +func (d *DynamicTransformers) TransformerForResource(resource schema.GroupResource) storagevalue.Transformer { return &resourceTransformer{ resource: resource, transformTracker: d.transformTracker, @@ -873,7 +873,7 @@ func (d *DynamicTransformers) TransformerForResource(resource schema.GroupResour // Set sets the transformer overrides. This method is not go routine safe and must only be called by the same, single caller throughout the lifetime of this object. func (d *DynamicTransformers) Set( - transformerOverrides map[schema.GroupResource]value.Transformer, + transformerOverrides map[schema.GroupResource]storagevalue.Transformer, closeTransformers context.CancelFunc, kmsPluginHealthzCheck healthz.HealthChecker, kmsCloseGracePeriod time.Duration, @@ -898,34 +898,30 @@ func (d *DynamicTransformers) Set( }() } -var _ value.Transformer = &resourceTransformer{} +var _ storagevalue.Transformer = &resourceTransformer{} type resourceTransformer struct { resource schema.GroupResource transformTracker *atomic.Value } -func (r *resourceTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) { +func (r *resourceTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx storagevalue.Context) ([]byte, bool, error) { return r.transformer().TransformFromStorage(ctx, data, dataCtx) } -func (r *resourceTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) { +func (r *resourceTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx storagevalue.Context) ([]byte, error) { return r.transformer().TransformToStorage(ctx, data, dataCtx) } -func (r *resourceTransformer) transformer() value.Transformer { +func (r *resourceTransformer) transformer() storagevalue.Transformer { return transformerFromOverrides(r.transformTracker.Load().(*transformTracker).transformerOverrides, r.resource) } -type ResourceTransformers interface { - TransformerForResource(resource schema.GroupResource) value.Transformer -} +var _ storagevalue.ResourceTransformers = &StaticTransformers{} -var _ ResourceTransformers = &StaticTransformers{} +type StaticTransformers map[schema.GroupResource]storagevalue.Transformer -type StaticTransformers map[schema.GroupResource]value.Transformer - -func (s StaticTransformers) TransformerForResource(resource schema.GroupResource) value.Transformer { +func (s StaticTransformers) TransformerForResource(resource schema.GroupResource) storagevalue.Transformer { return transformerFromOverrides(s, resource) } @@ -934,7 +930,7 @@ var anyGroupAnyResource = schema.GroupResource{ Resource: "*", } -func transformerFromOverrides(transformerOverrides map[schema.GroupResource]value.Transformer, resource schema.GroupResource) value.Transformer { +func transformerFromOverrides(transformerOverrides map[schema.GroupResource]storagevalue.Transformer, resource schema.GroupResource) storagevalue.Transformer { if transformer := transformerOverrides[resource]; transformer != nil { return transformer } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index 26e1701b5ce..9f4402411ef 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -38,6 +38,7 @@ import ( serverstorage "k8s.io/apiserver/pkg/server/storage" "k8s.io/apiserver/pkg/storage/storagebackend" storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory" + storagevalue "k8s.io/apiserver/pkg/storage/value" "k8s.io/klog/v2" ) @@ -63,11 +64,6 @@ type EtcdOptions struct { // WatchCacheSizes represents override to a given resource WatchCacheSizes []string - // complete guards fields that must be initialized via Complete before the Apply methods can be used. - complete bool - resourceTransformers encryptionconfig.ResourceTransformers - kmsPluginHealthzChecks []healthz.HealthChecker - // SkipHealthEndpoints, when true, causes the Apply methods to not set up health endpoints. // This allows multiple invocations of the Apply methods without duplication of said endpoints. SkipHealthEndpoints bool @@ -211,94 +207,18 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) { "The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.") } -// Complete must be called exactly once before using any of the Apply methods. It is responsible for setting -// up objects that must be created once and reused across multiple invocations such as storage transformers. -// This method mutates the receiver (EtcdOptions). It must never mutate the inputs. -func (s *EtcdOptions) Complete( - stopCh <-chan struct{}, - addPostStartHook func(name string, hook server.PostStartHookFunc) error, -) error { - if s == nil { - return nil - } - - if s.complete { - return fmt.Errorf("EtcdOptions.Complete called more than once") - } - - if len(s.EncryptionProviderConfigFilepath) != 0 { - ctxServer := wait.ContextForChannel(stopCh) - // nolint:govet // The only code path where closeTransformers does not get called is when it gets stored in dynamicTransformers. - ctxTransformers, closeTransformers := context.WithCancel(ctxServer) - - encryptionConfiguration, err := encryptionconfig.LoadEncryptionConfig(ctxTransformers, s.EncryptionProviderConfigFilepath, s.EncryptionProviderConfigAutomaticReload) - if err != nil { - // in case of error, we want to close partially initialized (if any) transformers - closeTransformers() - return err - } - - // enable kms hot reload controller only if the config file is set to be automatically reloaded - if s.EncryptionProviderConfigAutomaticReload { - // with reload=true we will always have 1 health check - if len(encryptionConfiguration.HealthChecks) != 1 { - // in case of error, we want to close partially initialized (if any) transformers - closeTransformers() - return fmt.Errorf("failed to start kms encryption config hot reload controller. only 1 health check should be available when reload is enabled") - } - - // Here the dynamic transformers take ownership of the transformers and their cancellation. - dynamicTransformers := encryptionconfig.NewDynamicTransformers(encryptionConfiguration.Transformers, encryptionConfiguration.HealthChecks[0], closeTransformers, encryptionConfiguration.KMSCloseGracePeriod) - - // add post start hook to start hot reload controller - // adding this hook here will ensure that it gets configured exactly once - err = addPostStartHook( - "start-encryption-provider-config-automatic-reload", - func(_ server.PostStartHookContext) error { - dynamicEncryptionConfigController := encryptionconfigcontroller.NewDynamicEncryptionConfiguration( - "encryption-provider-config-automatic-reload-controller", - s.EncryptionProviderConfigFilepath, - dynamicTransformers, - encryptionConfiguration.EncryptionFileContentHash, - ) - - go dynamicEncryptionConfigController.Run(ctxServer) - - return nil - }, - ) - if err != nil { - // in case of error, we want to close partially initialized (if any) transformers - closeTransformers() - return fmt.Errorf("failed to add post start hook for kms encryption config hot reload controller: %w", err) - } - - s.resourceTransformers = dynamicTransformers - s.kmsPluginHealthzChecks = []healthz.HealthChecker{dynamicTransformers} - } else { - s.resourceTransformers = encryptionconfig.StaticTransformers(encryptionConfiguration.Transformers) - s.kmsPluginHealthzChecks = encryptionConfiguration.HealthChecks - } - } - - s.complete = true - - // nolint:govet // The only code path where closeTransformers does not get called is when it gets stored in dynamicTransformers. - return nil -} - // ApplyTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions). func (s *EtcdOptions) ApplyTo(c *server.Config) error { if s == nil { return nil } - storageConfig := s.StorageConfig - if storageConfig.StorageObjectCountTracker == nil { - storageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker + storageConfigCopy := s.StorageConfig + if storageConfigCopy.StorageObjectCountTracker == nil { + storageConfigCopy.StorageObjectCountTracker = c.StorageObjectCountTracker } - return s.ApplyWithStorageFactoryTo(&SimpleStorageFactory{StorageConfig: storageConfig}, c) + return s.ApplyWithStorageFactoryTo(&SimpleStorageFactory{StorageConfig: storageConfigCopy}, c) } // ApplyWithStorageFactoryTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions). @@ -307,24 +227,94 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac return nil } - if !s.complete { - return fmt.Errorf("EtcdOptions.Apply called without completion") - } - if !s.SkipHealthEndpoints { if err := s.addEtcdHealthEndpoint(c); err != nil { return err } } - if s.resourceTransformers != nil { + // setup encryption + if err := s.maybeApplyResourceTransformers(c); err != nil { + return err + } + + c.RESTOptionsGetter = s.CreateRESTOptionsGetter(factory, c.ResourceTransformers) + return nil +} + +func (s *EtcdOptions) CreateRESTOptionsGetter(factory serverstorage.StorageFactory, resourceTransformers storagevalue.ResourceTransformers) generic.RESTOptionsGetter { + if resourceTransformers != nil { factory = &transformerStorageFactory{ delegate: factory, - resourceTransformers: s.resourceTransformers, + resourceTransformers: resourceTransformers, + } + } + return &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory} +} + +func (s *EtcdOptions) maybeApplyResourceTransformers(c *server.Config) (err error) { + if c.ResourceTransformers != nil { + return nil + } + if len(s.EncryptionProviderConfigFilepath) == 0 { + return nil + } + + ctxServer := wait.ContextForChannel(c.DrainedNotify()) + ctxTransformers, closeTransformers := context.WithCancel(ctxServer) + defer func() { + // in case of error, we want to close partially initialized (if any) transformers + if err != nil { + closeTransformers() + } + }() + + encryptionConfiguration, err := encryptionconfig.LoadEncryptionConfig(ctxTransformers, s.EncryptionProviderConfigFilepath, s.EncryptionProviderConfigAutomaticReload) + if err != nil { + return err + } + + if s.EncryptionProviderConfigAutomaticReload { + // with reload=true we will always have 1 health check + if len(encryptionConfiguration.HealthChecks) != 1 { + return fmt.Errorf("failed to start kms encryption config hot reload controller. only 1 health check should be available when reload is enabled") + } + + // Here the dynamic transformers take ownership of the transformers and their cancellation. + dynamicTransformers := encryptionconfig.NewDynamicTransformers(encryptionConfiguration.Transformers, encryptionConfiguration.HealthChecks[0], closeTransformers, encryptionConfiguration.KMSCloseGracePeriod) + + // add post start hook to start hot reload controller + // adding this hook here will ensure that it gets configured exactly once + err = c.AddPostStartHook( + "start-encryption-provider-config-automatic-reload", + func(_ server.PostStartHookContext) error { + dynamicEncryptionConfigController := encryptionconfigcontroller.NewDynamicEncryptionConfiguration( + "encryption-provider-config-automatic-reload-controller", + s.EncryptionProviderConfigFilepath, + dynamicTransformers, + encryptionConfiguration.EncryptionFileContentHash, + ) + + go dynamicEncryptionConfigController.Run(ctxServer) + + return nil + }, + ) + if err != nil { + return fmt.Errorf("failed to add post start hook for kms encryption config hot reload controller: %w", err) + } + + c.ResourceTransformers = dynamicTransformers + if !s.SkipHealthEndpoints { + c.AddHealthChecks(dynamicTransformers) + } + } else { + c.ResourceTransformers = encryptionconfig.StaticTransformers(encryptionConfiguration.Transformers) + if !s.SkipHealthEndpoints { + c.AddHealthChecks(encryptionConfiguration.HealthChecks...) } } - c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory} return nil } @@ -345,8 +335,6 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error { return readyCheck() })) - c.AddHealthChecks(s.kmsPluginHealthzChecks...) - return nil } @@ -454,7 +442,7 @@ var _ serverstorage.StorageFactory = &transformerStorageFactory{} type transformerStorageFactory struct { delegate serverstorage.StorageFactory - resourceTransformers encryptionconfig.ResourceTransformers + resourceTransformers storagevalue.ResourceTransformers } func (t *transformerStorageFactory) NewConfig(resource schema.GroupResource) (*storagebackend.ConfigForResource, error) { diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go index 06434e0f615..d10474248b4 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go @@ -306,9 +306,6 @@ func TestKMSHealthzEndpoint(t *testing.T) { EncryptionProviderConfigAutomaticReload: tc.reload, SkipHealthEndpoints: tc.skipHealth, } - if err := etcdOptions.Complete(serverConfig.DrainedNotify(), serverConfig.AddPostStartHook); err != nil { - t.Fatal(err) - } if err := etcdOptions.ApplyTo(serverConfig); err != nil { t.Fatalf("Failed to add healthz error: %v", err) } @@ -345,9 +342,6 @@ func TestReadinessCheck(t *testing.T) { t.Run(tc.name, func(t *testing.T) { serverConfig := server.NewConfig(codecs) etcdOptions := &EtcdOptions{SkipHealthEndpoints: tc.skipHealth} - if err := etcdOptions.Complete(serverConfig.DrainedNotify(), serverConfig.AddPostStartHook); err != nil { - t.Fatal(err) - } if err := etcdOptions.ApplyTo(serverConfig); err != nil { t.Fatalf("Failed to add healthz error: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go index 82e4b955013..69f8fb51556 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go @@ -101,9 +101,6 @@ func (o *RecommendedOptions) AddFlags(fs *pflag.FlagSet) { // ApplyTo adds RecommendedOptions to the server configuration. // pluginInitializers can be empty, it is only need for additional initializers. func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error { - if err := o.Etcd.Complete(config.Config.DrainedNotify(), config.Config.AddPostStartHook); err != nil { - return err - } if err := o.Etcd.ApplyTo(&config.Config); err != nil { return err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/transformer.go b/staging/src/k8s.io/apiserver/pkg/storage/value/transformer.go index a6a4aa184d6..cb48ed50c71 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/transformer.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/transformer.go @@ -23,6 +23,7 @@ import ( "fmt" "time" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/errors" ) @@ -50,6 +51,11 @@ type Transformer interface { TransformToStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, err error) } +// ResourceTransformers returns a transformer for the provided resource. +type ResourceTransformers interface { + TransformerForResource(resource schema.GroupResource) Transformer +} + // DefaultContext is a simple implementation of Context for a slice of bytes. type DefaultContext []byte