diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 1fb6180d606..4025281e1fc 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -37,7 +37,6 @@ import ( genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" - genericoptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" kubeexternalinformers "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" @@ -91,11 +90,16 @@ func createAggregatorConfig( } // copy the etcd options so we don't mutate originals. + // we assume that the etcd options have been completed already. avoid messing with anything outside + // of changes to StorageConfig as that may lead to unexpected behavior when the options are applied. etcdOptions := *commandOptions.Etcd etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIListChunking) etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion) etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName}) - genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions} + etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks + if err := etcdOptions.ApplyTo(&genericConfig); err != nil { + return nil, err + } // override MergedResourceConfig with aggregator defaults and registry if err := commandOptions.APIEnablement.ApplyTo( diff --git a/cmd/kube-apiserver/app/apiextensions.go b/cmd/kube-apiserver/app/apiextensions.go index 24302216f37..9fb9c4776b7 100644 --- a/cmd/kube-apiserver/app/apiextensions.go +++ b/cmd/kube-apiserver/app/apiextensions.go @@ -29,7 +29,6 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" - genericoptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/webhook" kubeexternalinformers "k8s.io/client-go/informers" @@ -64,13 +63,18 @@ func createAPIExtensionsConfig( } // copy the etcd options so we don't mutate originals. + // we assume that the etcd options have been completed already. avoid messing with anything outside + // of changes to StorageConfig as that may lead to unexpected behavior when the options are applied. etcdOptions := *commandOptions.Etcd etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) // this is where the true decodable levels come from. etcdOptions.StorageConfig.Codec = apiextensionsapiserver.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion) // prefer the more compact serialization (v1beta1) for storage until https://issue.k8s.io/82292 is resolved for objects whose v1 serialization is too big but whose v1beta1 serialization can be stored etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1beta1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName}) - genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions} + etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks + if err := etcdOptions.ApplyTo(&genericConfig); err != nil { + return nil, err + } // override MergedResourceConfig with apiextensions defaults and registry if err := commandOptions.APIEnablement.ApplyTo( diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 1a3b0e9ee0e..fc36d044dbe 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -397,24 +397,23 @@ func buildGenericConfig( kubeVersion := version.Get() genericConfig.Version = &kubeVersion - storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig() - storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig - completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd) - if err != nil { - lastErr = err - return - } - storageFactory, lastErr = completedStorageFactoryConfig.New(genericConfig.DrainedNotify()) - if lastErr != nil { - return - } if genericConfig.EgressSelector != nil { - storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup + s.Etcd.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup } if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { - storageFactory.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider + s.Etcd.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider } else { - storageFactory.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider() + s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider() + } + if lastErr = s.Etcd.Complete(genericConfig.StorageObjectCountTracker, genericConfig.DrainedNotify()); lastErr != nil { + return + } + + storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig() + storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig + storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New() + if lastErr != nil { + return } if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil { return diff --git a/pkg/controlplane/instance_test.go b/pkg/controlplane/instance_test.go index f677ae12cb4..691f2546153 100644 --- a/pkg/controlplane/instance_test.go +++ b/pkg/controlplane/instance_test.go @@ -89,6 +89,9 @@ func setUp(t *testing.T) (*etcd3testing.EtcdTestServer, Config, *assert.Assertio etcdOptions := options.NewEtcdOptions(storageConfig) // unit tests don't need watch cache and it leaks lots of goroutines with etcd testing functions during unit tests etcdOptions.EnableWatchCache = false + if err := etcdOptions.Complete(config.GenericConfig.StorageObjectCountTracker, config.GenericConfig.DrainedNotify()); err != nil { + t.Fatal(err) + } err := etcdOptions.ApplyWithStorageFactoryTo(storageFactory, config.GenericConfig) if err != nil { t.Fatal(err) diff --git a/pkg/kubeapiserver/default_storage_factory_builder.go b/pkg/kubeapiserver/default_storage_factory_builder.go index 1ad90354b8b..fb66d0c1f32 100644 --- a/pkg/kubeapiserver/default_storage_factory_builder.go +++ b/pkg/kubeapiserver/default_storage_factory_builder.go @@ -22,7 +22,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" serveroptions "k8s.io/apiserver/pkg/server/options" - "k8s.io/apiserver/pkg/server/options/encryptionconfig" "k8s.io/apiserver/pkg/server/resourceconfig" serverstorage "k8s.io/apiserver/pkg/server/storage" "k8s.io/apiserver/pkg/storage/storagebackend" @@ -58,7 +57,6 @@ func DefaultWatchCacheSizes() map[schema.GroupResource]int { // NewStorageFactoryConfig returns a new StorageFactoryConfig set up with necessary resource overrides. func NewStorageFactoryConfig() *StorageFactoryConfig { - resources := []schema.GroupVersionResource{ // If a resource has to be stored in a version that is not the // latest, then it can be listed here. Usually this is the case @@ -83,23 +81,22 @@ func NewStorageFactoryConfig() *StorageFactoryConfig { // StorageFactoryConfig is a configuration for creating storage factory. type StorageFactoryConfig struct { - StorageConfig storagebackend.Config - APIResourceConfig *serverstorage.ResourceConfig - DefaultResourceEncoding *serverstorage.DefaultResourceEncodingConfig - DefaultStorageMediaType string - Serializer runtime.StorageSerializer - ResourceEncodingOverrides []schema.GroupVersionResource - EtcdServersOverrides []string - EncryptionProviderConfigFilepath string + StorageConfig storagebackend.Config + APIResourceConfig *serverstorage.ResourceConfig + DefaultResourceEncoding *serverstorage.DefaultResourceEncodingConfig + DefaultStorageMediaType string + Serializer runtime.StorageSerializer + ResourceEncodingOverrides []schema.GroupVersionResource + EtcdServersOverrides []string } // Complete completes the StorageFactoryConfig with provided etcdOptions returning completedStorageFactoryConfig. -func (c *StorageFactoryConfig) Complete(etcdOptions *serveroptions.EtcdOptions) (*completedStorageFactoryConfig, error) { +// This method mutates the receiver (StorageFactoryConfig). It must never mutate the inputs. +func (c *StorageFactoryConfig) Complete(etcdOptions *serveroptions.EtcdOptions) *completedStorageFactoryConfig { c.StorageConfig = etcdOptions.StorageConfig c.DefaultStorageMediaType = etcdOptions.DefaultStorageMediaType c.EtcdServersOverrides = etcdOptions.EtcdServersOverrides - c.EncryptionProviderConfigFilepath = etcdOptions.EncryptionProviderConfigFilepath - return &completedStorageFactoryConfig{c}, nil + return &completedStorageFactoryConfig{c} } // completedStorageFactoryConfig is a wrapper around StorageFactoryConfig completed with etcd options. @@ -111,7 +108,7 @@ type completedStorageFactoryConfig struct { } // New returns a new storage factory created from the completed storage factory configuration. -func (c *completedStorageFactoryConfig) New(stopCh <-chan struct{}) (*serverstorage.DefaultStorageFactory, error) { +func (c *completedStorageFactoryConfig) New() (*serverstorage.DefaultStorageFactory, error) { resourceEncodingConfig := resourceconfig.MergeResourceEncodingConfigs(c.DefaultResourceEncoding, c.ResourceEncodingOverrides) storageFactory := serverstorage.NewDefaultStorageFactory( c.StorageConfig, @@ -141,14 +138,5 @@ func (c *completedStorageFactoryConfig) New(stopCh <-chan struct{}) (*serverstor servers := strings.Split(tokens[1], ";") storageFactory.SetEtcdLocation(groupResource, servers) } - if len(c.EncryptionProviderConfigFilepath) != 0 { - transformerOverrides, err := encryptionconfig.GetTransformerOverrides(c.EncryptionProviderConfigFilepath, stopCh) - if err != nil { - return nil, err - } - for groupResource, transformer := range transformerOverrides { - storageFactory.SetTransformer(groupResource, transformer) - } - } return storageFactory, nil } diff --git a/pkg/registry/registrytest/etcd.go b/pkg/registry/registrytest/etcd.go index e635f530a60..ba2fb1e194b 100644 --- a/pkg/registry/registrytest/etcd.go +++ b/pkg/registry/registrytest/etcd.go @@ -17,7 +17,6 @@ limitations under the License. package registrytest import ( - "context" "testing" "k8s.io/apimachinery/pkg/runtime/schema" @@ -36,18 +35,12 @@ func NewEtcdStorage(t *testing.T, group string) (*storagebackend.ConfigForResour func NewEtcdStorageForResource(t *testing.T, resource schema.GroupResource) (*storagebackend.ConfigForResource, *etcd3testing.EtcdTestServer) { t.Helper() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - server, config := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) options := options.NewEtcdOptions(config) - completedConfig, err := kubeapiserver.NewStorageFactoryConfig().Complete(options) - if err != nil { - t.Fatal(err) - } + completedConfig := kubeapiserver.NewStorageFactoryConfig().Complete(options) completedConfig.APIResourceConfig = serverstorage.NewResourceConfig() - factory, err := completedConfig.New(ctx.Done()) + factory, err := completedConfig.New() if err != nil { t.Fatal(err) } diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go index af72d9ce7ed..e7ebf78beda 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go @@ -28,13 +28,13 @@ import ( "github.com/spf13/pflag" + "k8s.io/apiextensions-apiserver/pkg/apiserver" "k8s.io/apiextensions-apiserver/pkg/cmd/server/options" "k8s.io/apimachinery/pkg/util/wait" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" - "k8s.io/klog/v2" ) @@ -47,10 +47,11 @@ type TestServerInstanceOptions struct { // TestServer return values supplied by kube-test-ApiServer type TestServer struct { - ClientConfig *restclient.Config // Rest client config - ServerOpts *options.CustomResourceDefinitionsServerOptions // ServerOpts - TearDownFn TearDownFunc // TearDown function - TmpDir string // Temp Dir used, by the apiserver + ClientConfig *restclient.Config // Rest client config + ServerOpts *options.CustomResourceDefinitionsServerOptions // ServerOpts + TearDownFn TearDownFunc // TearDown function + TmpDir string // Temp Dir used, by the apiserver + CompletedConfig apiserver.CompletedConfig } // Logger allows t.Testing and b.Testing to be passed to StartTestServer and StartTestServerOrDie @@ -144,7 +145,8 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin if err != nil { return result, fmt.Errorf("failed to create config from options: %v", err) } - server, err := config.Complete().New(genericapiserver.NewEmptyDelegate()) + completedConfig := config.Complete() + server, err := completedConfig.New(genericapiserver.NewEmptyDelegate()) if err != nil { return result, fmt.Errorf("failed to create server: %v", err) } @@ -187,6 +189,7 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin result.ClientConfig = server.GenericAPIServer.LoopbackClientConfig result.ServerOpts = s result.TearDownFn = tearDown + result.CompletedConfig = completedConfig return result, nil } diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures/server.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures/server.go index ad6bfe0a7d8..f330e0d65e5 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures/server.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures/server.go @@ -26,21 +26,38 @@ import ( "go.etcd.io/etcd/client/pkg/v3/transport" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" - serveroptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options" - "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiextensions-apiserver/pkg/apiserver" "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + serveroptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options" servertesting "k8s.io/apiextensions-apiserver/pkg/cmd/server/testing" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" ) // StartDefaultServer starts a test server. func StartDefaultServer(t servertesting.Logger, flags ...string) (func(), *rest.Config, *serveroptions.CustomResourceDefinitionsServerOptions, error) { + tearDownFn, s, err := startDefaultServer(t, flags...) + if err != nil { + return nil, nil, nil, err + } + return tearDownFn, s.ClientConfig, s.ServerOpts, nil +} + +func StartDefaultServerWithConfigAccess(t servertesting.Logger, flags ...string) (func(), *rest.Config, apiserver.CompletedConfig, error) { + tearDownFn, s, err := startDefaultServer(t, flags...) + if err != nil { + return nil, nil, apiserver.CompletedConfig{}, err + } + return tearDownFn, s.ClientConfig, s.CompletedConfig, nil +} + +func startDefaultServer(t servertesting.Logger, flags ...string) (func(), servertesting.TestServer, error) { // create kubeconfig which will not actually be used. But authz/authn needs it to startup. fakeKubeConfig, err := ioutil.TempFile("", "kubeconfig") if err != nil { - return nil, nil, nil, err + return nil, servertesting.TestServer{}, err } fakeKubeConfig.WriteString(` apiVersion: v1 @@ -75,7 +92,7 @@ users: ), nil) if err != nil { os.Remove(fakeKubeConfig.Name()) - return nil, nil, nil, err + return nil, servertesting.TestServer{}, err } tearDownFn := func() { @@ -83,7 +100,7 @@ users: s.TearDownFn() } - return tearDownFn, s.ClientConfig, s.ServerOpts, nil + return tearDownFn, s, nil } // StartDefaultServerWithClients starts a test server and returns clients for it. diff --git a/staging/src/k8s.io/apiextensions-apiserver/test/integration/pruning_test.go b/staging/src/k8s.io/apiextensions-apiserver/test/integration/pruning_test.go index 2101e6cb434..eb8f38925a9 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/test/integration/pruning_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/test/integration/pruning_test.go @@ -298,7 +298,7 @@ func TestPruningStatus(t *testing.T) { } func TestPruningFromStorage(t *testing.T) { - tearDown, config, options, err := fixtures.StartDefaultServer(t) + tearDown, config, completedConfig, err := fixtures.StartDefaultServerWithConfigAccess(t) if err != nil { t.Fatal(err) } @@ -314,11 +314,6 @@ func TestPruningFromStorage(t *testing.T) { t.Fatal(err) } - serverConfig, err := options.Config() - if err != nil { - t.Fatal(err) - } - crd := pruningFixture.DeepCopy() crd.Spec.Versions[0].Schema = &apiextensionsv1.CustomResourceValidation{} if err := yaml.Unmarshal([]byte(fooSchema), &crd.Spec.Versions[0].Schema.OpenAPIV3Schema); err != nil { @@ -330,7 +325,7 @@ func TestPruningFromStorage(t *testing.T) { t.Fatal(err) } - restOptions, err := serverConfig.GenericConfig.RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural}) + restOptions, err := completedConfig.GenericConfig.RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural}) if err != nil { t.Fatal(err) } diff --git a/staging/src/k8s.io/apiserver/pkg/apis/config/validation/validation.go b/staging/src/k8s.io/apiserver/pkg/apis/config/validation/validation.go index 866e6149614..fc30efa92fa 100644 --- a/staging/src/k8s.io/apiserver/pkg/apis/config/validation/validation.go +++ b/staging/src/k8s.io/apiserver/pkg/apis/config/validation/validation.go @@ -43,10 +43,12 @@ const ( ) var ( - aesKeySizes = []int{16, 24, 32} // See https://golang.org/pkg/crypto/aes/#NewCipher for details on supported key sizes for AES. - secretBoxKeySizes = []int{32} + aesKeySizes = []int{16, 24, 32} + // See https://godoc.org/golang.org/x/crypto/nacl/secretbox#Open for details on the supported key sizes for Secretbox. + secretBoxKeySizes = []int{32} + root = field.NewPath("resources") ) diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go index bb6c948614c..af750674200 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go @@ -94,16 +94,6 @@ func (p *kmsv2PluginProbe) toHealthzCheck(idx int) healthz.HealthChecker { }) } -func GetKMSPluginHealthzCheckers(filepath string, stopCh <-chan struct{}) ([]healthz.HealthChecker, error) { - _, kmsHealthChecks, err := LoadEncryptionConfig(filepath, stopCh) - return kmsHealthChecks, err -} - -func GetTransformerOverrides(filepath string, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, error) { - transformers, _, err := LoadEncryptionConfig(filepath, stopCh) - return transformers, err -} - func LoadEncryptionConfig(filepath string, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, []healthz.HealthChecker, error) { config, err := loadConfig(filepath) if err != nil { @@ -159,7 +149,7 @@ func getTransformerOverridesAndKMSPluginProbes(config *apiserverconfig.Encryptio for gr, transList := range resourceToPrefixTransformer { gr := gr transList := transList - transformers[gr] = value.NewMutableTransformer(value.NewPrefixTransformers(fmt.Errorf("no matching prefix found"), transList...)) + transformers[gr] = value.NewPrefixTransformers(fmt.Errorf("no matching prefix found"), transList...) } return transformers, probes, nil @@ -425,8 +415,8 @@ var ( // The factory to create kms service. This is to make writing test easier. envelopeServiceFactory = envelope.NewGRPCService - // The factory to create kmsv2 service. - envelopeKMSv2ServiceFactory = envelopekmsv2.NewGRPCService + // The factory to create kmsv2 service. Exported for integration tests. + EnvelopeKMSv2ServiceFactory = envelopekmsv2.NewGRPCService ) func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-chan struct{}) (value.PrefixTransformer, healthChecker, error) { @@ -458,7 +448,7 @@ func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-cha return value.PrefixTransformer{}, nil, fmt.Errorf("could not configure KMSv2 plugin %q, KMSv2 feature is not enabled", kmsName) } - envelopeService, err := envelopeKMSv2ServiceFactory(ctx, config.Endpoint, config.Timeout.Duration) + envelopeService, err := EnvelopeKMSv2ServiceFactory(ctx, config.Endpoint, config.Timeout.Duration) if err != nil { return value.PrefixTransformer{}, nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, error: %v", kmsName, err) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go index edff1c2896a..15c0e93cb95 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go @@ -163,12 +163,12 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)() // Set factory for mock envelope service factory := envelopeServiceFactory - factoryKMSv2 := envelopeKMSv2ServiceFactory + factoryKMSv2 := EnvelopeKMSv2ServiceFactory envelopeServiceFactory = newMockEnvelopeService - envelopeKMSv2ServiceFactory = newMockEnvelopeKMSv2Service + EnvelopeKMSv2ServiceFactory = newMockEnvelopeKMSv2Service defer func() { envelopeServiceFactory = factory - envelopeKMSv2ServiceFactory = factoryKMSv2 + EnvelopeKMSv2ServiceFactory = factoryKMSv2 }() ctx := testContext(t) diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index 380d44d94bf..6a0a5891410 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -36,6 +36,7 @@ import ( "k8s.io/apiserver/pkg/storage/storagebackend" storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory" "k8s.io/apiserver/pkg/storage/value" + flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" "k8s.io/klog/v2" ) @@ -59,6 +60,15 @@ type EtcdOptions struct { DefaultWatchCacheSize int // WatchCacheSizes represents override to a given resource WatchCacheSizes []string + + // complete guards fields that must be initialized via Complete before the Apply methods can be used. + complete bool + transformerOverrides map[schema.GroupResource]value.Transformer + kmsPluginHealthzChecks []healthz.HealthChecker + + // SkipHealthEndpoints, when true, causes the Apply methods to not set up health endpoints. + // This allows multiple invocations of the Apply methods without duplication of said endpoints. + SkipHealthEndpoints bool } var storageTypes = sets.NewString( @@ -190,39 +200,65 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) { "The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.") } +// Complete must be called exactly once before using any of the Apply methods. It is responsible for setting +// up objects that must be created once and reused across multiple invocations such as storage transformers. +// This method mutates the receiver (EtcdOptions). It must never mutate the inputs. +func (s *EtcdOptions) Complete(storageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker, stopCh <-chan struct{}) error { + if s == nil { + return nil + } + + if s.complete { + return fmt.Errorf("EtcdOptions.Complete called more than once") + } + + if len(s.EncryptionProviderConfigFilepath) != 0 { + transformerOverrides, kmsPluginHealthzChecks, err := encryptionconfig.LoadEncryptionConfig(s.EncryptionProviderConfigFilepath, stopCh) + if err != nil { + return err + } + s.transformerOverrides = transformerOverrides + s.kmsPluginHealthzChecks = kmsPluginHealthzChecks + } + + s.StorageConfig.StorageObjectCountTracker = storageObjectCountTracker + + s.complete = true + + return nil +} + +// ApplyTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions). func (s *EtcdOptions) ApplyTo(c *server.Config) error { if s == nil { return nil } - if err := s.addEtcdHealthEndpoint(c); err != nil { - return err + + return s.ApplyWithStorageFactoryTo(&SimpleStorageFactory{StorageConfig: s.StorageConfig}, c) +} + +// ApplyWithStorageFactoryTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions). +func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error { + if s == nil { + return nil } - transformerOverrides := make(map[schema.GroupResource]value.Transformer) - if len(s.EncryptionProviderConfigFilepath) > 0 { - var err error - transformerOverrides, err = encryptionconfig.GetTransformerOverrides(s.EncryptionProviderConfigFilepath, c.DrainedNotify()) - if err != nil { + + if !s.complete { + return fmt.Errorf("EtcdOptions.Apply called without completion") + } + + if !s.SkipHealthEndpoints { + if err := s.addEtcdHealthEndpoint(c); err != nil { return err } } - // use the StorageObjectCountTracker interface instance from server.Config - s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker - - c.RESTOptionsGetter = &SimpleRestOptionsFactory{ - Options: *s, - TransformerOverrides: transformerOverrides, + if len(s.transformerOverrides) > 0 { + factory = &transformerStorageFactory{ + delegate: factory, + transformerOverrides: s.transformerOverrides, + } } - return nil -} - -func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error { - if err := s.addEtcdHealthEndpoint(c); err != nil { - return err - } - - // use the StorageObjectCountTracker interface instance from server.Config - s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory} return nil @@ -245,57 +281,11 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error { return readyCheck() })) - if s.EncryptionProviderConfigFilepath != "" { - kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath, c.DrainedNotify()) - if err != nil { - return err - } - c.AddHealthChecks(kmsPluginHealthzChecks...) - } + c.AddHealthChecks(s.kmsPluginHealthzChecks...) return nil } -type SimpleRestOptionsFactory struct { - Options EtcdOptions - TransformerOverrides map[schema.GroupResource]value.Transformer -} - -func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { - ret := generic.RESTOptions{ - StorageConfig: f.Options.StorageConfig.ForResource(resource), - Decorator: generic.UndecoratedStorage, - EnableGarbageCollection: f.Options.EnableGarbageCollection, - DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, - ResourcePrefix: resource.Group + "/" + resource.Resource, - CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, - StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker, - } - if f.TransformerOverrides != nil { - if transformer, ok := f.TransformerOverrides[resource]; ok { - ret.StorageConfig.Transformer = transformer - } - } - if f.Options.EnableWatchCache { - sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) - if err != nil { - return generic.RESTOptions{}, err - } - size, ok := sizes[resource] - if ok && size > 0 { - klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource) - } - if ok && size <= 0 { - klog.V(3).InfoS("Not using watch cache", "resource", resource) - ret.Decorator = generic.UndecoratedStorage - } else { - klog.V(3).InfoS("Using watch cache", "resource", resource) - ret.Decorator = genericregistry.StorageWithCacher() - } - } - return ret, nil -} - type StorageFactoryRestOptionsFactory struct { Options EtcdOptions StorageFactory serverstorage.StorageFactory @@ -316,6 +306,7 @@ func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker, } + if f.Options.EnableWatchCache { sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) if err != nil { @@ -326,8 +317,10 @@ func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource) } if ok && size <= 0 { + klog.V(3).InfoS("Not using watch cache", "resource", resource) ret.Decorator = generic.UndecoratedStorage } else { + klog.V(3).InfoS("Using watch cache", "resource", resource) ret.Decorator = genericregistry.StorageWithCacher() } } @@ -369,3 +362,60 @@ func WriteWatchCacheSizes(watchCacheSizes map[schema.GroupResource]int) ([]strin } return cacheSizes, nil } + +var _ serverstorage.StorageFactory = &SimpleStorageFactory{} + +// SimpleStorageFactory provides a StorageFactory implementation that should be used when different +// resources essentially share the same storage config (as defined by the given storagebackend.Config). +// It assumes the resources are stored at a path that is purely based on the schema.GroupResource. +// Users that need flexibility and per resource overrides should use DefaultStorageFactory instead. +type SimpleStorageFactory struct { + StorageConfig storagebackend.Config +} + +func (s *SimpleStorageFactory) NewConfig(resource schema.GroupResource) (*storagebackend.ConfigForResource, error) { + return s.StorageConfig.ForResource(resource), nil +} + +func (s *SimpleStorageFactory) ResourcePrefix(resource schema.GroupResource) string { + return resource.Group + "/" + resource.Resource +} + +func (s *SimpleStorageFactory) Backends() []serverstorage.Backend { + // nothing should ever call this method but we still provide a functional implementation + return serverstorage.Backends(s.StorageConfig) +} + +var _ serverstorage.StorageFactory = &transformerStorageFactory{} + +type transformerStorageFactory struct { + delegate serverstorage.StorageFactory + transformerOverrides map[schema.GroupResource]value.Transformer +} + +func (t *transformerStorageFactory) NewConfig(resource schema.GroupResource) (*storagebackend.ConfigForResource, error) { + config, err := t.delegate.NewConfig(resource) + if err != nil { + return nil, err + } + + transformer, ok := t.transformerOverrides[resource] + if !ok { + return config, nil + } + + configCopy := *config + resourceConfig := configCopy.Config + resourceConfig.Transformer = transformer + configCopy.Config = resourceConfig + + return &configCopy, nil +} + +func (t *transformerStorageFactory) ResourcePrefix(resource schema.GroupResource) string { + return t.delegate.ResourcePrefix(resource) +} + +func (t *transformerStorageFactory) Backends() []serverstorage.Backend { + return t.delegate.Backends() +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go index 0d380f0762d..6fac3d53b7c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd_test.go @@ -204,6 +204,7 @@ func TestKMSHealthzEndpoint(t *testing.T) { name string encryptionConfigPath string wantChecks []string + skipHealth bool }{ { name: "single kms-provider, expect single kms healthz check", @@ -215,6 +216,12 @@ func TestKMSHealthzEndpoint(t *testing.T) { encryptionConfigPath: "testdata/encryption-configs/multiple-kms-providers.yaml", wantChecks: []string{"etcd", "kms-provider-0", "kms-provider-1"}, }, + { + name: "two kms-providers with skip, expect zero kms healthz checks", + encryptionConfigPath: "testdata/encryption-configs/multiple-kms-providers.yaml", + wantChecks: nil, + skipHealth: true, + }, } scheme := runtime.NewScheme() @@ -225,8 +232,12 @@ func TestKMSHealthzEndpoint(t *testing.T) { serverConfig := server.NewConfig(codecs) etcdOptions := &EtcdOptions{ EncryptionProviderConfigFilepath: tc.encryptionConfigPath, + SkipHealthEndpoints: tc.skipHealth, } - if err := etcdOptions.addEtcdHealthEndpoint(serverConfig); err != nil { + if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify()); err != nil { + t.Fatal(err) + } + if err := etcdOptions.ApplyTo(serverConfig); err != nil { t.Fatalf("Failed to add healthz error: %v", err) } @@ -244,12 +255,19 @@ func TestReadinessCheck(t *testing.T) { name string wantReadyzChecks []string wantHealthzChecks []string + skipHealth bool }{ { name: "Readyz should have etcd-readiness check", wantReadyzChecks: []string{"etcd", "etcd-readiness"}, wantHealthzChecks: []string{"etcd"}, }, + { + name: "skip health, Readyz should not have etcd-readiness check", + wantReadyzChecks: nil, + wantHealthzChecks: nil, + skipHealth: true, + }, } scheme := runtime.NewScheme() @@ -258,8 +276,11 @@ func TestReadinessCheck(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { serverConfig := server.NewConfig(codecs) - etcdOptions := &EtcdOptions{} - if err := etcdOptions.addEtcdHealthEndpoint(serverConfig); err != nil { + etcdOptions := &EtcdOptions{SkipHealthEndpoints: tc.skipHealth} + if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify()); err != nil { + t.Fatal(err) + } + if err := etcdOptions.ApplyTo(serverConfig); err != nil { t.Fatalf("Failed to add healthz error: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go index 5746baa02d2..4fc2d5b9905 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go @@ -101,6 +101,9 @@ func (o *RecommendedOptions) AddFlags(fs *pflag.FlagSet) { // ApplyTo adds RecommendedOptions to the server configuration. // pluginInitializers can be empty, it is only need for additional initializers. func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error { + if err := o.Etcd.Complete(config.Config.StorageObjectCountTracker, config.Config.DrainedNotify()); err != nil { + return err + } if err := o.Etcd.ApplyTo(&config.Config); err != nil { return err } diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go index f2f69883ae0..5b1c24446c7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_factory.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage/storagebackend" - "k8s.io/apiserver/pkg/storage/value" utilfeature "k8s.io/apiserver/pkg/util/feature" ) @@ -112,8 +111,6 @@ type groupResourceOverrides struct { // decoderDecoratorFn is optional and may wrap the provided decoders (can add new decoders). The order of // returned decoders will be priority for attempt to decode. decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder - // transformer is optional and shall encrypt that resource at rest. - transformer value.Transformer // disablePaging will prevent paging on the provided resource. disablePaging bool } @@ -139,9 +136,6 @@ func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *St if o.decoderDecoratorFn != nil { options.DecoderDecoratorFn = o.decoderDecoratorFn } - if o.transformer != nil { - config.Transformer = o.transformer - } if o.disablePaging { config.Paging = false } @@ -210,12 +204,6 @@ func (s *DefaultStorageFactory) SetSerializer(groupResource schema.GroupResource s.Overrides[groupResource] = overrides } -func (s *DefaultStorageFactory) SetTransformer(groupResource schema.GroupResource, transformer value.Transformer) { - overrides := s.Overrides[groupResource] - overrides.transformer = transformer - s.Overrides[groupResource] = overrides -} - // AddCohabitatingResources links resources together the order of the slice matters! its the priority order of lookup for finding a storage location func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...schema.GroupResource) { for _, groupResource := range groupResources { @@ -291,25 +279,35 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (* // Backends returns all backends for all registered storage destinations. // Used for getting all instances for health validations. func (s *DefaultStorageFactory) Backends() []Backend { - servers := sets.NewString(s.StorageConfig.Transport.ServerList...) + return backends(s.StorageConfig, s.Overrides) +} - for _, overrides := range s.Overrides { +// Backends returns all backends for all registered storage destinations. +// Used for getting all instances for health validations. +func Backends(storageConfig storagebackend.Config) []Backend { + return backends(storageConfig, nil) +} + +func backends(storageConfig storagebackend.Config, grOverrides map[schema.GroupResource]groupResourceOverrides) []Backend { + servers := sets.NewString(storageConfig.Transport.ServerList...) + + for _, overrides := range grOverrides { servers.Insert(overrides.etcdLocation...) } tlsConfig := &tls.Config{ InsecureSkipVerify: true, } - if len(s.StorageConfig.Transport.CertFile) > 0 && len(s.StorageConfig.Transport.KeyFile) > 0 { - cert, err := tls.LoadX509KeyPair(s.StorageConfig.Transport.CertFile, s.StorageConfig.Transport.KeyFile) + if len(storageConfig.Transport.CertFile) > 0 && len(storageConfig.Transport.KeyFile) > 0 { + cert, err := tls.LoadX509KeyPair(storageConfig.Transport.CertFile, storageConfig.Transport.KeyFile) if err != nil { klog.Errorf("failed to load key pair while getting backends: %s", err) } else { tlsConfig.Certificates = []tls.Certificate{cert} } } - if len(s.StorageConfig.Transport.TrustedCAFile) > 0 { - if caCert, err := ioutil.ReadFile(s.StorageConfig.Transport.TrustedCAFile); err != nil { + if len(storageConfig.Transport.TrustedCAFile) > 0 { + if caCert, err := ioutil.ReadFile(storageConfig.Transport.TrustedCAFile); err != nil { klog.Errorf("failed to read ca file while getting backends: %s", err) } else { caPool := x509.NewCertPool() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index e23c360b8eb..c1785964956 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -48,7 +48,7 @@ import ( "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/storagebackend" - "k8s.io/apiserver/pkg/storage/value" + "k8s.io/apiserver/pkg/storage/value/encrypt/identity" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/metrics/legacyregistry" tracing "k8s.io/component-base/tracing" @@ -395,7 +395,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime. } transformer := c.Transformer if transformer == nil { - transformer = value.IdentityTransformer + transformer = identity.NewEncryptCheckTransformer() } return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 0966d234a99..4a3b1ddaa4f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -48,7 +48,7 @@ import ( "k8s.io/apiserver/pkg/storage/etcd3" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" storagetesting "k8s.io/apiserver/pkg/storage/testing" - "k8s.io/apiserver/pkg/storage/value" + "k8s.io/apiserver/pkg/storage/value/encrypt/identity" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" @@ -107,7 +107,7 @@ func newPodList() runtime.Object { return &example.PodList{} } func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, schema.GroupResource{Resource: "pods"}, value.IdentityTransformer, true, etcd3.NewDefaultLeaseManagerConfig()) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig()) return server, storage } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/identity/identity.go b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/identity/identity.go index 7bdd5f6823e..8d967d70681 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/identity/identity.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/identity/identity.go @@ -24,6 +24,12 @@ import ( "k8s.io/apiserver/pkg/storage/value" ) +var ( + transformer = identityTransformer{} + encryptedPrefix = []byte("k8s:enc:") + errEncryptedData = fmt.Errorf("identity transformer tried to read encrypted data") +) + // identityTransformer performs no transformation on provided data, but validates // that the data is not encrypted data during TransformFromStorage type identityTransformer struct{} @@ -31,7 +37,7 @@ type identityTransformer struct{} // NewEncryptCheckTransformer returns an identityTransformer which returns an error // on attempts to read encrypted data func NewEncryptCheckTransformer() value.Transformer { - return identityTransformer{} + return transformer } // TransformFromStorage returns the input bytes if the data is not encrypted @@ -39,8 +45,8 @@ func (identityTransformer) TransformFromStorage(ctx context.Context, data []byte // identityTransformer has to return an error if the data is encoded using another transformer. // JSON data starts with '{'. Protobuf data has a prefix 'k8s[\x00-\xFF]'. // Prefix 'k8s:enc:' is reserved for encrypted data on disk. - if bytes.HasPrefix(data, []byte("k8s:enc:")) { - return []byte{}, false, fmt.Errorf("identity transformer tried to read encrypted data") + if bytes.HasPrefix(data, encryptedPrefix) { + return nil, false, errEncryptedData } return data, false, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/transformer.go b/staging/src/k8s.io/apiserver/pkg/storage/value/transformer.go index 3c0ca7a60da..970d8d36288 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/transformer.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/transformer.go @@ -21,7 +21,6 @@ import ( "bytes" "context" "fmt" - "sync" "time" "k8s.io/apimachinery/pkg/util/errors" @@ -51,54 +50,11 @@ type Transformer interface { TransformToStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, err error) } -type identityTransformer struct{} - -// IdentityTransformer performs no transformation of the provided data. -var IdentityTransformer Transformer = identityTransformer{} - -func (identityTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx Context) ([]byte, bool, error) { - return data, false, nil -} -func (identityTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx Context) ([]byte, error) { - return data, nil -} - // DefaultContext is a simple implementation of Context for a slice of bytes. type DefaultContext []byte // AuthenticatedData returns itself. -func (c DefaultContext) AuthenticatedData() []byte { return []byte(c) } - -// MutableTransformer allows a transformer to be changed safely at runtime. -type MutableTransformer struct { - lock sync.RWMutex - transformer Transformer -} - -// NewMutableTransformer creates a transformer that can be updated at any time by calling Set() -func NewMutableTransformer(transformer Transformer) *MutableTransformer { - return &MutableTransformer{transformer: transformer} -} - -// Set updates the nested transformer. -func (t *MutableTransformer) Set(transformer Transformer) { - t.lock.Lock() - t.transformer = transformer - t.lock.Unlock() -} - -func (t *MutableTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, stale bool, err error) { - t.lock.RLock() - transformer := t.transformer - t.lock.RUnlock() - return transformer.TransformFromStorage(ctx, data, dataCtx) -} -func (t *MutableTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, err error) { - t.lock.RLock() - transformer := t.transformer - t.lock.RUnlock() - return transformer.TransformToStorage(ctx, data, dataCtx) -} +func (c DefaultContext) AuthenticatedData() []byte { return c } // PrefixTransformer holds a transformer interface and the prefix that the transformation is located under. type PrefixTransformer struct { diff --git a/test/integration/controlplane/transformation/kmsv2_transformation_test.go b/test/integration/controlplane/transformation/kmsv2_transformation_test.go index d8db16d705d..dfb250a938f 100644 --- a/test/integration/controlplane/transformation/kmsv2_transformation_test.go +++ b/test/integration/controlplane/transformation/kmsv2_transformation_test.go @@ -29,17 +29,25 @@ import ( "time" "github.com/gogo/protobuf/proto" + + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/server/options/encryptionconfig" "k8s.io/apiserver/pkg/storage/value" aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes" + "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2" kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2alpha1" kmsv2mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v2alpha1" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/dynamic" featuregatetesting "k8s.io/component-base/featuregate/testing" kmsv2api "k8s.io/kms/apis/v2alpha1" + "k8s.io/kubernetes/test/integration/etcd" ) type envelopekmsv2 struct { @@ -273,3 +281,81 @@ resources: mustBeHealthy(t, "kms-provider-0", test.kubeAPIServer.ClientConfig) mustBeUnHealthy(t, "kms-provider-1", test.kubeAPIServer.ClientConfig) } + +func TestKMSv2SingleService(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)() + + var kmsv2Calls int + origEnvelopeKMSv2ServiceFactory := encryptionconfig.EnvelopeKMSv2ServiceFactory + encryptionconfig.EnvelopeKMSv2ServiceFactory = func(ctx context.Context, endpoint string, callTimeout time.Duration) (kmsv2.Service, error) { + kmsv2Calls++ + return origEnvelopeKMSv2ServiceFactory(ctx, endpoint, callTimeout) + } + t.Cleanup(func() { + encryptionconfig.EnvelopeKMSv2ServiceFactory = origEnvelopeKMSv2ServiceFactory + }) + + // check resources provided by the three servers that we have wired together + // - pods and config maps from KAS + // - CRDs and CRs from API extensions + // - API services from aggregator + encryptionConfig := ` +kind: EncryptionConfiguration +apiVersion: apiserver.config.k8s.io/v1 +resources: + - resources: + - pods + - configmaps + - customresourcedefinitions.apiextensions.k8s.io + - pandas.awesome.bears.com + - apiservices.apiregistration.k8s.io + providers: + - kms: + apiVersion: v2 + name: kms-provider + cachesize: 1000 + endpoint: unix:///@kms-provider.sock +` + + pluginMock, err := kmsv2mock.NewBase64Plugin("@kms-provider.sock") + if err != nil { + t.Fatalf("failed to create mock of KMSv2 Plugin: %v", err) + } + + go pluginMock.Start() + if err := kmsv2mock.WaitForBase64PluginToBeUp(pluginMock); err != nil { + t.Fatalf("Failed start plugin, err: %v", err) + } + t.Cleanup(pluginMock.CleanUp) + + test, err := newTransformTest(t, encryptionConfig) + if err != nil { + t.Fatalf("failed to start KUBE API Server with encryptionConfig\n %s, error: %v", encryptionConfig, err) + } + t.Cleanup(test.cleanUp) + + // the storage registry for CRs is dynamic so create one to exercise the wiring + etcd.CreateTestCRDs(t, apiextensionsclientset.NewForConfigOrDie(test.kubeAPIServer.ClientConfig), false, etcd.GetCustomResourceDefinitionData()...) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + t.Cleanup(cancel) + + gvr := schema.GroupVersionResource{Group: "awesome.bears.com", Version: "v1", Resource: "pandas"} + stub := etcd.GetEtcdStorageData()[gvr].Stub + dynamicClient, obj, err := etcd.JSONToUnstructured(stub, "", &meta.RESTMapping{ + Resource: gvr, + GroupVersionKind: gvr.GroupVersion().WithKind("Panda"), + Scope: meta.RESTScopeRoot, + }, dynamic.NewForConfigOrDie(test.kubeAPIServer.ClientConfig)) + if err != nil { + t.Fatal(err) + } + _, err = dynamicClient.Create(ctx, obj, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + if kmsv2Calls != 1 { + t.Fatalf("expected a single call to KMS v2 service factory: %v", kmsv2Calls) + } +}