From f507bc255382b2e2095351053bc17e74f7100d35 Mon Sep 17 00:00:00 2001 From: Monis Khan Date: Mon, 29 Aug 2022 17:25:48 -0400 Subject: [PATCH] Load encryption config once This change updates the API server code to load the encryption config once at start up instead of multiple times. Previously the code would set up the storage transformers and the etcd healthz checks in separate parse steps. This is problematic for KMS v2 key ID based staleness checks which need to be able to assert that the API server has a single view into the KMS plugin's current key ID. Signed-off-by: Monis Khan --- cmd/kube-apiserver/app/aggregator.go | 8 +- cmd/kube-apiserver/app/apiextensions.go | 8 +- cmd/kube-apiserver/app/server.go | 27 ++- pkg/controlplane/instance_test.go | 3 + .../default_storage_factory_builder.go | 34 +--- pkg/registry/registrytest/etcd.go | 11 +- .../pkg/cmd/server/testing/testserver.go | 15 +- .../test/integration/fixtures/server.go | 27 ++- .../test/integration/pruning_test.go | 9 +- .../pkg/apis/config/validation/validation.go | 6 +- .../server/options/encryptionconfig/config.go | 18 +- .../options/encryptionconfig/config_test.go | 6 +- .../apiserver/pkg/server/options/etcd.go | 190 +++++++++++------- .../apiserver/pkg/server/options/etcd_test.go | 27 ++- .../pkg/server/options/recommended.go | 3 + .../pkg/server/storage/storage_factory.go | 34 ++-- .../storage/storagebackend/factory/etcd3.go | 4 +- .../pkg/storage/tests/cacher_test.go | 4 +- .../value/encrypt/identity/identity.go | 12 +- .../pkg/storage/value/transformer.go | 46 +---- .../kmsv2_transformation_test.go | 86 ++++++++ 21 files changed, 348 insertions(+), 230 deletions(-) diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 1fb6180d606..4025281e1fc 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -37,7 +37,6 @@ import ( genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" - genericoptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" kubeexternalinformers "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" @@ -91,11 +90,16 @@ func createAggregatorConfig( } // copy the etcd options so we don't mutate originals. + // we assume that the etcd options have been completed already. avoid messing with anything outside + // of changes to StorageConfig as that may lead to unexpected behavior when the options are applied. etcdOptions := *commandOptions.Etcd etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIListChunking) etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion) etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName}) - genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions} + etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks + if err := etcdOptions.ApplyTo(&genericConfig); err != nil { + return nil, err + } // override MergedResourceConfig with aggregator defaults and registry if err := commandOptions.APIEnablement.ApplyTo( diff --git a/cmd/kube-apiserver/app/apiextensions.go b/cmd/kube-apiserver/app/apiextensions.go index 24302216f37..9fb9c4776b7 100644 --- a/cmd/kube-apiserver/app/apiextensions.go +++ b/cmd/kube-apiserver/app/apiextensions.go @@ -29,7 +29,6 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" - genericoptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/webhook" kubeexternalinformers "k8s.io/client-go/informers" @@ -64,13 +63,18 @@ func createAPIExtensionsConfig( } // copy the etcd options so we don't mutate originals. + // we assume that the etcd options have been completed already. avoid messing with anything outside + // of changes to StorageConfig as that may lead to unexpected behavior when the options are applied. etcdOptions := *commandOptions.Etcd etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) // this is where the true decodable levels come from. etcdOptions.StorageConfig.Codec = apiextensionsapiserver.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion) // prefer the more compact serialization (v1beta1) for storage until https://issue.k8s.io/82292 is resolved for objects whose v1 serialization is too big but whose v1beta1 serialization can be stored etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1beta1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName}) - genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions} + etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks + if err := etcdOptions.ApplyTo(&genericConfig); err != nil { + return nil, err + } // override MergedResourceConfig with apiextensions defaults and registry if err := commandOptions.APIEnablement.ApplyTo( diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 1a3b0e9ee0e..fc36d044dbe 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -397,24 +397,23 @@ func buildGenericConfig( kubeVersion := version.Get() genericConfig.Version = &kubeVersion - storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig() - storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig - completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd) - if err != nil { - lastErr = err - return - } - storageFactory, lastErr = completedStorageFactoryConfig.New(genericConfig.DrainedNotify()) - if lastErr != nil { - return - } if genericConfig.EgressSelector != nil { - storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup + s.Etcd.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup } if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { - storageFactory.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider + s.Etcd.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider } else { - storageFactory.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider() + s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider() + } + if lastErr = s.Etcd.Complete(genericConfig.StorageObjectCountTracker, genericConfig.DrainedNotify()); lastErr != nil { + return + } + + storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig() + storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig + storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New() + if lastErr != nil { + return } if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil { return diff --git a/pkg/controlplane/instance_test.go b/pkg/controlplane/instance_test.go index f677ae12cb4..691f2546153 100644 --- a/pkg/controlplane/instance_test.go +++ b/pkg/controlplane/instance_test.go @@ -89,6 +89,9 @@ func setUp(t *testing.T) (*etcd3testing.EtcdTestServer, Config, *assert.Assertio etcdOptions := options.NewEtcdOptions(storageConfig) // unit tests don't need watch cache and it leaks lots of goroutines with etcd testing functions during unit tests etcdOptions.EnableWatchCache = false + if err := etcdOptions.Complete(config.GenericConfig.StorageObjectCountTracker, config.GenericConfig.DrainedNotify()); err != nil { + t.Fatal(err) + } err := etcdOptions.ApplyWithStorageFactoryTo(storageFactory, config.GenericConfig) if err != nil { t.Fatal(err) diff --git a/pkg/kubeapiserver/default_storage_factory_builder.go b/pkg/kubeapiserver/default_storage_factory_builder.go index 1ad90354b8b..fb66d0c1f32 100644 --- a/pkg/kubeapiserver/default_storage_factory_builder.go +++ b/pkg/kubeapiserver/default_storage_factory_builder.go @@ -22,7 +22,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" serveroptions "k8s.io/apiserver/pkg/server/options" - "k8s.io/apiserver/pkg/server/options/encryptionconfig" "k8s.io/apiserver/pkg/server/resourceconfig" serverstorage "k8s.io/apiserver/pkg/server/storage" "k8s.io/apiserver/pkg/storage/storagebackend" @@ -58,7 +57,6 @@ func DefaultWatchCacheSizes() map[schema.GroupResource]int { // NewStorageFactoryConfig returns a new StorageFactoryConfig set up with necessary resource overrides. func NewStorageFactoryConfig() *StorageFactoryConfig { - resources := []schema.GroupVersionResource{ // If a resource has to be stored in a version that is not the // latest, then it can be listed here. Usually this is the case @@ -83,23 +81,22 @@ func NewStorageFactoryConfig() *StorageFactoryConfig { // StorageFactoryConfig is a configuration for creating storage factory. type StorageFactoryConfig struct { - StorageConfig storagebackend.Config - APIResourceConfig *serverstorage.ResourceConfig - DefaultResourceEncoding *serverstorage.DefaultResourceEncodingConfig - DefaultStorageMediaType string - Serializer runtime.StorageSerializer - ResourceEncodingOverrides []schema.GroupVersionResource - EtcdServersOverrides []string - EncryptionProviderConfigFilepath string + StorageConfig storagebackend.Config + APIResourceConfig *serverstorage.ResourceConfig + DefaultResourceEncoding *serverstorage.DefaultResourceEncodingConfig + DefaultStorageMediaType string + Serializer runtime.StorageSerializer + ResourceEncodingOverrides []schema.GroupVersionResource + EtcdServersOverrides []string } // Complete completes the StorageFactoryConfig with provided etcdOptions returning completedStorageFactoryConfig. -func (c *StorageFactoryConfig) Complete(etcdOptions *serveroptions.EtcdOptions) (*completedStorageFactoryConfig, error) { +// This method mutates the receiver (StorageFactoryConfig). It must never mutate the inputs. +func (c *StorageFactoryConfig) Complete(etcdOptions *serveroptions.EtcdOptions) *completedStorageFactoryConfig { c.StorageConfig = etcdOptions.StorageConfig c.DefaultStorageMediaType = etcdOptions.DefaultStorageMediaType c.EtcdServersOverrides = etcdOptions.EtcdServersOverrides - c.EncryptionProviderConfigFilepath = etcdOptions.EncryptionProviderConfigFilepath - return &completedStorageFactoryConfig{c}, nil + return &completedStorageFactoryConfig{c} } // completedStorageFactoryConfig is a wrapper around StorageFactoryConfig completed with etcd options. @@ -111,7 +108,7 @@ type completedStorageFactoryConfig struct { } // New returns a new storage factory created from the completed storage factory configuration. -func (c *completedStorageFactoryConfig) New(stopCh <-chan struct{}) (*serverstorage.DefaultStorageFactory, error) { +func (c *completedStorageFactoryConfig) New() (*serverstorage.DefaultStorageFactory, error) { resourceEncodingConfig := resourceconfig.MergeResourceEncodingConfigs(c.DefaultResourceEncoding, c.ResourceEncodingOverrides) storageFactory := serverstorage.NewDefaultStorageFactory( c.StorageConfig, @@ -141,14 +138,5 @@ func (c *completedStorageFactoryConfig) New(stopCh <-chan struct{}) (*serverstor servers := strings.Split(tokens[1], ";") storageFactory.SetEtcdLocation(groupResource, servers) } - if len(c.EncryptionProviderConfigFilepath) != 0 { - transformerOverrides, err := encryptionconfig.GetTransformerOverrides(c.EncryptionProviderConfigFilepath, stopCh) - if err != nil { - return nil, err - } - for groupResource, transformer := range transformerOverrides { - storageFactory.SetTransformer(groupResource, transformer) - } - } return storageFactory, nil } diff --git a/pkg/registry/registrytest/etcd.go b/pkg/registry/registrytest/etcd.go index e635f530a60..ba2fb1e194b 100644 --- a/pkg/registry/registrytest/etcd.go +++ b/pkg/registry/registrytest/etcd.go @@ -17,7 +17,6 @@ limitations under the License. package registrytest import ( - "context" "testing" "k8s.io/apimachinery/pkg/runtime/schema" @@ -36,18 +35,12 @@ func NewEtcdStorage(t *testing.T, group string) (*storagebackend.ConfigForResour func NewEtcdStorageForResource(t *testing.T, resource schema.GroupResource) (*storagebackend.ConfigForResource, *etcd3testing.EtcdTestServer) { t.Helper() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - server, config := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) options := options.NewEtcdOptions(config) - completedConfig, err := kubeapiserver.NewStorageFactoryConfig().Complete(options) - if err != nil { - t.Fatal(err) - } + completedConfig := kubeapiserver.NewStorageFactoryConfig().Complete(options) completedConfig.APIResourceConfig = serverstorage.NewResourceConfig() - factory, err := completedConfig.New(ctx.Done()) + factory, err := completedConfig.New() if err != nil { t.Fatal(err) } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go index af72d9ce7ed..e7ebf78beda 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go @@ -28,13 +28,13 @@ import ( "github.com/spf13/pflag" + "k8s.io/apiextensions-apiserver/pkg/apiserver" "k8s.io/apiextensions-apiserver/pkg/cmd/server/options" "k8s.io/apimachinery/pkg/util/wait" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" - "k8s.io/klog/v2" ) @@ -47,10 +47,11 @@ type TestServerInstanceOptions struct { // TestServer return values supplied by kube-test-ApiServer type TestServer struct { - ClientConfig *restclient.Config // Rest client config - ServerOpts *options.CustomResourceDefinitionsServerOptions // ServerOpts - TearDownFn TearDownFunc // TearDown function - TmpDir string // Temp Dir used, by the apiserver + ClientConfig *restclient.Config // Rest client config + ServerOpts *options.CustomResourceDefinitionsServerOptions // ServerOpts + TearDownFn TearDownFunc // TearDown function + TmpDir string // Temp Dir used, by the apiserver + CompletedConfig apiserver.CompletedConfig } // Logger allows t.Testing and b.Testing to be passed to StartTestServer and StartTestServerOrDie @@ -144,7 +145,8 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin if err != nil { return result, fmt.Errorf("failed to create config from options: %v", err) } - server, err := config.Complete().New(genericapiserver.NewEmptyDelegate()) + completedConfig := config.Complete() + server, err := completedConfig.New(genericapiserver.NewEmptyDelegate()) if err != nil { return result, fmt.Errorf("failed to create server: %v", err) } @@ -187,6 +189,7 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin result.ClientConfig = server.GenericAPIServer.LoopbackClientConfig result.ServerOpts = s result.TearDownFn = tearDown + result.CompletedConfig = completedConfig return result, nil } diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures/server.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures/server.go index ad6bfe0a7d8..f330e0d65e5 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures/server.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures/server.go @@ -26,21 +26,38 @@ import ( "go.etcd.io/etcd/client/pkg/v3/transport" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" - serveroptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options" - "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiextensions-apiserver/pkg/apiserver" "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + serveroptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options" servertesting "k8s.io/apiextensions-apiserver/pkg/cmd/server/testing" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" ) // StartDefaultServer starts a test server. func StartDefaultServer(t servertesting.Logger, flags ...string) (func(), *rest.Config, *serveroptions.CustomResourceDefinitionsServerOptions, error) { + tearDownFn, s, err := startDefaultServer(t, flags...) + if err != nil { + return nil, nil, nil, err + } + return tearDownFn, s.ClientConfig, s.ServerOpts, nil +} + +func StartDefaultServerWithConfigAccess(t servertesting.Logger, flags ...string) (func(), *rest.Config, apiserver.CompletedConfig, error) { + tearDownFn, s, err := startDefaultServer(t, flags...) + if err != nil { + return nil, nil, apiserver.CompletedConfig{}, err + } + return tearDownFn, s.ClientConfig, s.CompletedConfig, nil +} + +func startDefaultServer(t servertesting.Logger, flags ...string) (func(), servertesting.TestServer, error) { // create kubeconfig which will not actually be used. But authz/authn needs it to startup. fakeKubeConfig, err := ioutil.TempFile("", "kubeconfig") if err != nil { - return nil, nil, nil, err + return nil, servertesting.TestServer{}, err } fakeKubeConfig.WriteString(` apiVersion: v1 @@ -75,7 +92,7 @@ users: ), nil) if err != nil { os.Remove(fakeKubeConfig.Name()) - return nil, nil, nil, err + return nil, servertesting.TestServer{}, err } tearDownFn := func() { @@ -83,7 +100,7 @@ users: s.TearDownFn() } - return tearDownFn, s.ClientConfig, s.ServerOpts, nil + return tearDownFn, s, nil } // StartDefaultServerWithClients starts a test server and returns clients for it. diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/pruning_test.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/pruning_test.go index 2101e6cb434..eb8f38925a9 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/pruning_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/pruning_test.go @@ -298,7 +298,7 @@ func TestPruningStatus(t *testing.T) { } func TestPruningFromStorage(t *testing.T) { - tearDown, config, options, err := fixtures.StartDefaultServer(t) + tearDown, config, completedConfig, err := fixtures.StartDefaultServerWithConfigAccess(t) if err != nil { t.Fatal(err) } @@ -314,11 +314,6 @@ func TestPruningFromStorage(t *testing.T) { t.Fatal(err) } - serverConfig, err := options.Config() - if err != nil { - t.Fatal(err) - } - crd := pruningFixture.DeepCopy() crd.Spec.Versions[0].Schema = &apiextensionsv1.CustomResourceValidation{} if err := yaml.Unmarshal([]byte(fooSchema), &crd.Spec.Versions[0].Schema.OpenAPIV3Schema); err != nil { @@ -330,7 +325,7 @@ func TestPruningFromStorage(t *testing.T) { t.Fatal(err) } - restOptions, err := serverConfig.GenericConfig.RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural}) + restOptions, err := completedConfig.GenericConfig.RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural}) if err != nil { t.Fatal(err) } diff --git a/staging/src/k8s.io/apiserver/pkg/apis/config/validation/validation.go b/staging/src/k8s.io/apiserver/pkg/apis/config/validation/validation.go index 866e6149614..fc30efa92fa 100644 --- a/staging/src/k8s.io/apiserver/pkg/apis/config/validation/validation.go +++ b/staging/src/k8s.io/apiserver/pkg/apis/config/validation/validation.go @@ -43,10 +43,12 @@ const ( ) var ( - aesKeySizes = []int{16, 24, 32} // See https://golang.org/pkg/crypto/aes/#NewCipher for details on supported key sizes for AES. - secretBoxKeySizes = []int{32} + aesKeySizes = []int{16, 24, 32} + // See https://godoc.org/golang.org/x/crypto/nacl/secretbox#Open for details on the supported key sizes for Secretbox. + secretBoxKeySizes = []int{32} + root = field.NewPath("resources") ) diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go index bb6c948614c..af750674200 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go @@ -94,16 +94,6 @@ func (p *kmsv2PluginProbe) toHealthzCheck(idx int) healthz.HealthChecker { }) } -func GetKMSPluginHealthzCheckers(filepath string, stopCh <-chan struct{}) ([]healthz.HealthChecker, error) { - _, kmsHealthChecks, err := LoadEncryptionConfig(filepath, stopCh) - return kmsHealthChecks, err -} - -func GetTransformerOverrides(filepath string, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, error) { - transformers, _, err := LoadEncryptionConfig(filepath, stopCh) - return transformers, err -} - func LoadEncryptionConfig(filepath string, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, []healthz.HealthChecker, error) { config, err := loadConfig(filepath) if err != nil { @@ -159,7 +149,7 @@ func getTransformerOverridesAndKMSPluginProbes(config *apiserverconfig.Encryptio for gr, transList := range resourceToPrefixTransformer { gr := gr transList := transList - transformers[gr] = value.NewMutableTransformer(value.NewPrefixTransformers(fmt.Errorf("no matching prefix found"), transList...)) + transformers[gr] = value.NewPrefixTransformers(fmt.Errorf("no matching prefix found"), transList...) } return transformers, probes, nil @@ -425,8 +415,8 @@ var ( // The factory to create kms service. This is to make writing test easier. envelopeServiceFactory = envelope.NewGRPCService - // The factory to create kmsv2 service. - envelopeKMSv2ServiceFactory = envelopekmsv2.NewGRPCService + // The factory to create kmsv2 service. Exported for integration tests. + EnvelopeKMSv2ServiceFactory = envelopekmsv2.NewGRPCService ) func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-chan struct{}) (value.PrefixTransformer, healthChecker, error) { @@ -458,7 +448,7 @@ func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-cha return value.PrefixTransformer{}, nil, fmt.Errorf("could not configure KMSv2 plugin %q, KMSv2 feature is not enabled", kmsName) } - envelopeService, err := envelopeKMSv2ServiceFactory(ctx, config.Endpoint, config.Timeout.Duration) + envelopeService, err := EnvelopeKMSv2ServiceFactory(ctx, config.Endpoint, config.Timeout.Duration) if err != nil { return value.PrefixTransformer{}, nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, error: %v", kmsName, err) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go index edff1c2896a..15c0e93cb95 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go @@ -163,12 +163,12 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)() // Set factory for mock envelope service factory := envelopeServiceFactory - factoryKMSv2 := envelopeKMSv2ServiceFactory + factoryKMSv2 := EnvelopeKMSv2ServiceFactory envelopeServiceFactory = newMockEnvelopeService - envelopeKMSv2ServiceFactory = newMockEnvelopeKMSv2Service + EnvelopeKMSv2ServiceFactory = newMockEnvelopeKMSv2Service defer func() { envelopeServiceFactory = factory - envelopeKMSv2ServiceFactory = factoryKMSv2 + EnvelopeKMSv2ServiceFactory = factoryKMSv2 }() ctx := testContext(t) diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index 380d44d94bf..6a0a5891410 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -36,6 +36,7 @@ import ( "k8s.io/apiserver/pkg/storage/storagebackend" storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory" "k8s.io/apiserver/pkg/storage/value" + flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/klog/v2" ) @@ -59,6 +60,15 @@ type EtcdOptions struct { DefaultWatchCacheSize int // WatchCacheSizes represents override to a given resource WatchCacheSizes []string + + // complete guards fields that must be initialized via Complete before the Apply methods can be used. + complete bool + transformerOverrides map[schema.GroupResource]value.Transformer + kmsPluginHealthzChecks []healthz.HealthChecker + + // SkipHealthEndpoints, when true, causes the Apply methods to not set up health endpoints. + // This allows multiple invocations of the Apply methods without duplication of said endpoints. + SkipHealthEndpoints bool } var storageTypes = sets.NewString( @@ -190,39 +200,65 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) { "The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.") } +// Complete must be called exactly once before using any of the Apply methods. It is responsible for setting +// up objects that must be created once and reused across multiple invocations such as storage transformers. +// This method mutates the receiver (EtcdOptions). It must never mutate the inputs. +func (s *EtcdOptions) Complete(storageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker, stopCh <-chan struct{}) error { + if s == nil { + return nil + } + + if s.complete { + return fmt.Errorf("EtcdOptions.Complete called more than once") + } + + if len(s.EncryptionProviderConfigFilepath) != 0 { + transformerOverrides, kmsPluginHealthzChecks, err := encryptionconfig.LoadEncryptionConfig(s.EncryptionProviderConfigFilepath, stopCh) + if err != nil { + return err + } + s.transformerOverrides = transformerOverrides + s.kmsPluginHealthzChecks = kmsPluginHealthzChecks + } + + s.StorageConfig.StorageObjectCountTracker = storageObjectCountTracker + + s.complete = true + + return nil +} + +// ApplyTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions). func (s *EtcdOptions) ApplyTo(c *server.Config) error { if s == nil { return nil } - if err := s.addEtcdHealthEndpoint(c); err != nil { - return err + + return s.ApplyWithStorageFactoryTo(&SimpleStorageFactory{StorageConfig: s.StorageConfig}, c) +} + +// ApplyWithStorageFactoryTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions). +func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error { + if s == nil { + return nil } - transformerOverrides := make(map[schema.GroupResource]value.Transformer) - if len(s.EncryptionProviderConfigFilepath) > 0 { - var err error - transformerOverrides, err = encryptionconfig.GetTransformerOverrides(s.EncryptionProviderConfigFilepath, c.DrainedNotify()) - if err != nil { + + if !s.complete { + return fmt.Errorf("EtcdOptions.Apply called without completion") + } + + if !s.SkipHealthEndpoints { + if err := s.addEtcdHealthEndpoint(c); err != nil { return err } } - // use the StorageObjectCountTracker interface instance from server.Config - s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker - - c.RESTOptionsGetter = &SimpleRestOptionsFactory{ - Options: *s, - TransformerOverrides: transformerOverrides, + if len(s.transformerOverrides) > 0 { + factory = &transformerStorageFactory{ + delegate: factory, + transformerOverrides: s.transformerOverrides, + } } - return nil -} - -func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error { - if err := s.addEtcdHealthEndpoint(c); err != nil { - return err - } - - // use the StorageObjectCountTracker interface instance from server.Config - s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory} return nil @@ -245,57 +281,11 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error { return readyCheck() })) - if s.EncryptionProviderConfigFilepath != "" { - kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath, c.DrainedNotify()) - if err != nil { - return err - } - c.AddHealthChecks(kmsPluginHealthzChecks...) - } + c.AddHealthChecks(s.kmsPluginHealthzChecks...) return nil } -type SimpleRestOptionsFactory struct { - Options EtcdOptions - TransformerOverrides map[schema.GroupResource]value.Transformer -} - -func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { - ret := generic.RESTOptions{ - StorageConfig: f.Options.StorageConfig.ForResource(resource), - Decorator: generic.UndecoratedStorage, - EnableGarbageCollection: f.Options.EnableGarbageCollection, - DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, - ResourcePrefix: resource.Group + "/" + resource.Resource, - CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, - StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker, - } - if f.TransformerOverrides != nil { - if transformer, ok := f.TransformerOverrides[resource]; ok { - ret.StorageConfig.Transformer = transformer - } - } - if f.Options.EnableWatchCache { - sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) - if err != nil { - return generic.RESTOptions{}, err - } - size, ok := sizes[resource] - if ok && size > 0 { - klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource) - } - if ok && size <= 0 { - klog.V(3).InfoS("Not using watch cache", "resource", resource) - ret.Decorator = generic.UndecoratedStorage - } else { - klog.V(3).InfoS("Using watch cache", "resource", resource) - ret.Decorator = genericregistry.StorageWithCacher() - } - } - return ret, nil -} - type StorageFactoryRestOptionsFactory struct { Options EtcdOptions StorageFactory serverstorage.StorageFactory @@ -316,6 +306,7 @@ func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker, } + if f.Options.EnableWatchCache { sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) if err != nil { @@ -326,8 +317,10 @@ func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource) } if ok && size <= 0 { + klog.V(3).InfoS("Not using watch cache", "resource", resource) ret.Decorator = generic.UndecoratedStorage } else { + klog.V(3).InfoS("Using watch cache", "resource", resource) ret.Decorator = genericregistry.StorageWithCacher() } } @@ -369,3 +362,60 @@ func WriteWatchCacheSizes(watchCacheSizes map[schema.GroupResource]int) ([]strin } return cacheSizes, nil } + +var _ serverstorage.StorageFactory = &SimpleStorageFactory{} + +// SimpleStorageFactory provides a StorageFactory implementation that should be used when different +// resources essentially share the same storage config (as defined by the given storagebackend.Config). +// It assumes the resources are stored at a path that is purely based on the schema.GroupResource. +// Users that need flexibility and per resource overrides should use DefaultStorageFactory instead. +type SimpleStorageFactory struct { + StorageConfig storagebackend.Config +} + +func (s *SimpleStorageFactory) NewConfig(resource schema.GroupResource) (*storagebackend.ConfigForResource, error) { + return s.StorageConfig.ForResource(resource), nil +} + +func (s *SimpleStorageFactory) ResourcePrefix(resource schema.GroupResource) string { + return resource.Group + "/" + resource.Resource +} + +func (s *SimpleStorageFactory) Backends() []serverstorage.Backend { + // nothing should ever call this method but we still provide a functional implementation + return serverstorage.Backends(s.StorageConfig) +} + +var _ serverstorage.StorageFactory = &transformerStorageFactory{} + +type transformerStorageFactory struct { + delegate serverstorage.StorageFactory + transformerOverrides map[schema.GroupResource]value.Transformer +} + +func (t *transformerStorageFactory) NewConfig(resource schema.GroupResource) (*storagebackend.ConfigForResource, error) { + config, err := t.delegate.NewConfig(resource) + if err != nil { + return nil, err + } + + transformer, ok := t.transformerOverrides[resource] + if !ok { + return config, nil + } + + configCopy := *config + resourceConfig := configCopy.Config + resourceConfig.Transformer = transformer + configCopy.Config = resourceConfig + + return &configCopy, nil +} + +func (t *transformerStorageFactory) ResourcePrefix(resource schema.GroupResource) string { + return t.delegate.ResourcePrefix(resource) +} + +func (t *transformerStorageFactory) Backends() []serverstorage.Backend { + return t.delegate.Backends() +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go index 0d380f0762d..6fac3d53b7c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go @@ -204,6 +204,7 @@ func TestKMSHealthzEndpoint(t *testing.T) { name string encryptionConfigPath string wantChecks []string + skipHealth bool }{ { name: "single kms-provider, expect single kms healthz check", @@ -215,6 +216,12 @@ func TestKMSHealthzEndpoint(t *testing.T) { encryptionConfigPath: "testdata/encryption-configs/multiple-kms-providers.yaml", wantChecks: []string{"etcd", "kms-provider-0", "kms-provider-1"}, }, + { + name: "two kms-providers with skip, expect zero kms healthz checks", + encryptionConfigPath: "testdata/encryption-configs/multiple-kms-providers.yaml", + wantChecks: nil, + skipHealth: true, + }, } scheme := runtime.NewScheme() @@ -225,8 +232,12 @@ func TestKMSHealthzEndpoint(t *testing.T) { serverConfig := server.NewConfig(codecs) etcdOptions := &EtcdOptions{ EncryptionProviderConfigFilepath: tc.encryptionConfigPath, + SkipHealthEndpoints: tc.skipHealth, } - if err := etcdOptions.addEtcdHealthEndpoint(serverConfig); err != nil { + if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify()); err != nil { + t.Fatal(err) + } + if err := etcdOptions.ApplyTo(serverConfig); err != nil { t.Fatalf("Failed to add healthz error: %v", err) } @@ -244,12 +255,19 @@ func TestReadinessCheck(t *testing.T) { name string wantReadyzChecks []string wantHealthzChecks []string + skipHealth bool }{ { name: "Readyz should have etcd-readiness check", wantReadyzChecks: []string{"etcd", "etcd-readiness"}, wantHealthzChecks: []string{"etcd"}, }, + { + name: "skip health, Readyz should not have etcd-readiness check", + wantReadyzChecks: nil, + wantHealthzChecks: nil, + skipHealth: true, + }, } scheme := runtime.NewScheme() @@ -258,8 +276,11 @@ func TestReadinessCheck(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { serverConfig := server.NewConfig(codecs) - etcdOptions := &EtcdOptions{} - if err := etcdOptions.addEtcdHealthEndpoint(serverConfig); err != nil { + etcdOptions := &EtcdOptions{SkipHealthEndpoints: tc.skipHealth} + if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify()); err != nil { + t.Fatal(err) + } + if err := etcdOptions.ApplyTo(serverConfig); err != nil { t.Fatalf("Failed to add healthz error: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go index 5746baa02d2..4fc2d5b9905 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go @@ -101,6 +101,9 @@ func (o *RecommendedOptions) AddFlags(fs *pflag.FlagSet) { // ApplyTo adds RecommendedOptions to the server configuration. // pluginInitializers can be empty, it is only need for additional initializers. func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error { + if err := o.Etcd.Complete(config.Config.StorageObjectCountTracker, config.Config.DrainedNotify()); err != nil { + return err + } if err := o.Etcd.ApplyTo(&config.Config); err != nil { return err } diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go index f2f69883ae0..5b1c24446c7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage/storagebackend" - "k8s.io/apiserver/pkg/storage/value" utilfeature "k8s.io/apiserver/pkg/util/feature" ) @@ -112,8 +111,6 @@ type groupResourceOverrides struct { // decoderDecoratorFn is optional and may wrap the provided decoders (can add new decoders). The order of // returned decoders will be priority for attempt to decode. decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder - // transformer is optional and shall encrypt that resource at rest. - transformer value.Transformer // disablePaging will prevent paging on the provided resource. disablePaging bool } @@ -139,9 +136,6 @@ func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *St if o.decoderDecoratorFn != nil { options.DecoderDecoratorFn = o.decoderDecoratorFn } - if o.transformer != nil { - config.Transformer = o.transformer - } if o.disablePaging { config.Paging = false } @@ -210,12 +204,6 @@ func (s *DefaultStorageFactory) SetSerializer(groupResource schema.GroupResource s.Overrides[groupResource] = overrides } -func (s *DefaultStorageFactory) SetTransformer(groupResource schema.GroupResource, transformer value.Transformer) { - overrides := s.Overrides[groupResource] - overrides.transformer = transformer - s.Overrides[groupResource] = overrides -} - // AddCohabitatingResources links resources together the order of the slice matters! its the priority order of lookup for finding a storage location func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...schema.GroupResource) { for _, groupResource := range groupResources { @@ -291,25 +279,35 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (* // Backends returns all backends for all registered storage destinations. // Used for getting all instances for health validations. func (s *DefaultStorageFactory) Backends() []Backend { - servers := sets.NewString(s.StorageConfig.Transport.ServerList...) + return backends(s.StorageConfig, s.Overrides) +} - for _, overrides := range s.Overrides { +// Backends returns all backends for all registered storage destinations. +// Used for getting all instances for health validations. +func Backends(storageConfig storagebackend.Config) []Backend { + return backends(storageConfig, nil) +} + +func backends(storageConfig storagebackend.Config, grOverrides map[schema.GroupResource]groupResourceOverrides) []Backend { + servers := sets.NewString(storageConfig.Transport.ServerList...) + + for _, overrides := range grOverrides { servers.Insert(overrides.etcdLocation...) } tlsConfig := &tls.Config{ InsecureSkipVerify: true, } - if len(s.StorageConfig.Transport.CertFile) > 0 && len(s.StorageConfig.Transport.KeyFile) > 0 { - cert, err := tls.LoadX509KeyPair(s.StorageConfig.Transport.CertFile, s.StorageConfig.Transport.KeyFile) + if len(storageConfig.Transport.CertFile) > 0 && len(storageConfig.Transport.KeyFile) > 0 { + cert, err := tls.LoadX509KeyPair(storageConfig.Transport.CertFile, storageConfig.Transport.KeyFile) if err != nil { klog.Errorf("failed to load key pair while getting backends: %s", err) } else { tlsConfig.Certificates = []tls.Certificate{cert} } } - if len(s.StorageConfig.Transport.TrustedCAFile) > 0 { - if caCert, err := ioutil.ReadFile(s.StorageConfig.Transport.TrustedCAFile); err != nil { + if len(storageConfig.Transport.TrustedCAFile) > 0 { + if caCert, err := ioutil.ReadFile(storageConfig.Transport.TrustedCAFile); err != nil { klog.Errorf("failed to read ca file while getting backends: %s", err) } else { caPool := x509.NewCertPool() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index e23c360b8eb..c1785964956 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -48,7 +48,7 @@ import ( "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/storagebackend" - "k8s.io/apiserver/pkg/storage/value" + "k8s.io/apiserver/pkg/storage/value/encrypt/identity" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/metrics/legacyregistry" tracing "k8s.io/component-base/tracing" @@ -395,7 +395,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime. } transformer := c.Transformer if transformer == nil { - transformer = value.IdentityTransformer + transformer = identity.NewEncryptCheckTransformer() } return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 0966d234a99..4a3b1ddaa4f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -48,7 +48,7 @@ import ( "k8s.io/apiserver/pkg/storage/etcd3" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" storagetesting "k8s.io/apiserver/pkg/storage/testing" - "k8s.io/apiserver/pkg/storage/value" + "k8s.io/apiserver/pkg/storage/value/encrypt/identity" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" @@ -107,7 +107,7 @@ func newPodList() runtime.Object { return &example.PodList{} } func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, schema.GroupResource{Resource: "pods"}, value.IdentityTransformer, true, etcd3.NewDefaultLeaseManagerConfig()) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig()) return server, storage } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/identity/identity.go b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/identity/identity.go index 7bdd5f6823e..8d967d70681 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/identity/identity.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/identity/identity.go @@ -24,6 +24,12 @@ import ( "k8s.io/apiserver/pkg/storage/value" ) +var ( + transformer = identityTransformer{} + encryptedPrefix = []byte("k8s:enc:") + errEncryptedData = fmt.Errorf("identity transformer tried to read encrypted data") +) + // identityTransformer performs no transformation on provided data, but validates // that the data is not encrypted data during TransformFromStorage type identityTransformer struct{} @@ -31,7 +37,7 @@ type identityTransformer struct{} // NewEncryptCheckTransformer returns an identityTransformer which returns an error // on attempts to read encrypted data func NewEncryptCheckTransformer() value.Transformer { - return identityTransformer{} + return transformer } // TransformFromStorage returns the input bytes if the data is not encrypted @@ -39,8 +45,8 @@ func (identityTransformer) TransformFromStorage(ctx context.Context, data []byte // identityTransformer has to return an error if the data is encoded using another transformer. // JSON data starts with '{'. Protobuf data has a prefix 'k8s[\x00-\xFF]'. // Prefix 'k8s:enc:' is reserved for encrypted data on disk. - if bytes.HasPrefix(data, []byte("k8s:enc:")) { - return []byte{}, false, fmt.Errorf("identity transformer tried to read encrypted data") + if bytes.HasPrefix(data, encryptedPrefix) { + return nil, false, errEncryptedData } return data, false, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/transformer.go b/staging/src/k8s.io/apiserver/pkg/storage/value/transformer.go index 3c0ca7a60da..970d8d36288 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/transformer.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/transformer.go @@ -21,7 +21,6 @@ import ( "bytes" "context" "fmt" - "sync" "time" "k8s.io/apimachinery/pkg/util/errors" @@ -51,54 +50,11 @@ type Transformer interface { TransformToStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, err error) } -type identityTransformer struct{} - -// IdentityTransformer performs no transformation of the provided data. -var IdentityTransformer Transformer = identityTransformer{} - -func (identityTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx Context) ([]byte, bool, error) { - return data, false, nil -} -func (identityTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx Context) ([]byte, error) { - return data, nil -} - // DefaultContext is a simple implementation of Context for a slice of bytes. type DefaultContext []byte // AuthenticatedData returns itself. -func (c DefaultContext) AuthenticatedData() []byte { return []byte(c) } - -// MutableTransformer allows a transformer to be changed safely at runtime. -type MutableTransformer struct { - lock sync.RWMutex - transformer Transformer -} - -// NewMutableTransformer creates a transformer that can be updated at any time by calling Set() -func NewMutableTransformer(transformer Transformer) *MutableTransformer { - return &MutableTransformer{transformer: transformer} -} - -// Set updates the nested transformer. -func (t *MutableTransformer) Set(transformer Transformer) { - t.lock.Lock() - t.transformer = transformer - t.lock.Unlock() -} - -func (t *MutableTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, stale bool, err error) { - t.lock.RLock() - transformer := t.transformer - t.lock.RUnlock() - return transformer.TransformFromStorage(ctx, data, dataCtx) -} -func (t *MutableTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, err error) { - t.lock.RLock() - transformer := t.transformer - t.lock.RUnlock() - return transformer.TransformToStorage(ctx, data, dataCtx) -} +func (c DefaultContext) AuthenticatedData() []byte { return c } // PrefixTransformer holds a transformer interface and the prefix that the transformation is located under. type PrefixTransformer struct { diff --git a/test/integration/controlplane/transformation/kmsv2_transformation_test.go b/test/integration/controlplane/transformation/kmsv2_transformation_test.go index d8db16d705d..dfb250a938f 100644 --- a/test/integration/controlplane/transformation/kmsv2_transformation_test.go +++ b/test/integration/controlplane/transformation/kmsv2_transformation_test.go @@ -29,17 +29,25 @@ import ( "time" "github.com/gogo/protobuf/proto" + + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/server/options/encryptionconfig" "k8s.io/apiserver/pkg/storage/value" aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes" + "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2" kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2alpha1" kmsv2mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v2alpha1" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/dynamic" featuregatetesting "k8s.io/component-base/featuregate/testing" kmsv2api "k8s.io/kms/apis/v2alpha1" + "k8s.io/kubernetes/test/integration/etcd" ) type envelopekmsv2 struct { @@ -273,3 +281,81 @@ resources: mustBeHealthy(t, "kms-provider-0", test.kubeAPIServer.ClientConfig) mustBeUnHealthy(t, "kms-provider-1", test.kubeAPIServer.ClientConfig) } + +func TestKMSv2SingleService(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)() + + var kmsv2Calls int + origEnvelopeKMSv2ServiceFactory := encryptionconfig.EnvelopeKMSv2ServiceFactory + encryptionconfig.EnvelopeKMSv2ServiceFactory = func(ctx context.Context, endpoint string, callTimeout time.Duration) (kmsv2.Service, error) { + kmsv2Calls++ + return origEnvelopeKMSv2ServiceFactory(ctx, endpoint, callTimeout) + } + t.Cleanup(func() { + encryptionconfig.EnvelopeKMSv2ServiceFactory = origEnvelopeKMSv2ServiceFactory + }) + + // check resources provided by the three servers that we have wired together + // - pods and config maps from KAS + // - CRDs and CRs from API extensions + // - API services from aggregator + encryptionConfig := ` +kind: EncryptionConfiguration +apiVersion: apiserver.config.k8s.io/v1 +resources: + - resources: + - pods + - configmaps + - customresourcedefinitions.apiextensions.k8s.io + - pandas.awesome.bears.com + - apiservices.apiregistration.k8s.io + providers: + - kms: + apiVersion: v2 + name: kms-provider + cachesize: 1000 + endpoint: unix:///@kms-provider.sock +` + + pluginMock, err := kmsv2mock.NewBase64Plugin("@kms-provider.sock") + if err != nil { + t.Fatalf("failed to create mock of KMSv2 Plugin: %v", err) + } + + go pluginMock.Start() + if err := kmsv2mock.WaitForBase64PluginToBeUp(pluginMock); err != nil { + t.Fatalf("Failed start plugin, err: %v", err) + } + t.Cleanup(pluginMock.CleanUp) + + test, err := newTransformTest(t, encryptionConfig) + if err != nil { + t.Fatalf("failed to start KUBE API Server with encryptionConfig\n %s, error: %v", encryptionConfig, err) + } + t.Cleanup(test.cleanUp) + + // the storage registry for CRs is dynamic so create one to exercise the wiring + etcd.CreateTestCRDs(t, apiextensionsclientset.NewForConfigOrDie(test.kubeAPIServer.ClientConfig), false, etcd.GetCustomResourceDefinitionData()...) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + t.Cleanup(cancel) + + gvr := schema.GroupVersionResource{Group: "awesome.bears.com", Version: "v1", Resource: "pandas"} + stub := etcd.GetEtcdStorageData()[gvr].Stub + dynamicClient, obj, err := etcd.JSONToUnstructured(stub, "", &meta.RESTMapping{ + Resource: gvr, + GroupVersionKind: gvr.GroupVersion().WithKind("Panda"), + Scope: meta.RESTScopeRoot, + }, dynamic.NewForConfigOrDie(test.kubeAPIServer.ClientConfig)) + if err != nil { + t.Fatal(err) + } + _, err = dynamicClient.Create(ctx, obj, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + if kmsv2Calls != 1 { + t.Fatalf("expected a single call to KMS v2 service factory: %v", kmsv2Calls) + } +}