k8s.io/apiserver: remove skewed completion from EtcdOptions

This commit is contained in:
Dr. Stefan Schimanski 2023-06-02 20:25:31 +02:00
parent f351c6d1ec
commit e9e4acb1dd
No known key found for this signature in database
GPG Key ID: 4C68E0F19F95EC33
14 changed files with 166 additions and 211 deletions

View File

@ -71,17 +71,13 @@ func createAPIExtensionsConfig(
apiextensionsapiserver.Scheme); err != nil {
return nil, err
}
crdRESTOptionsGetter, err := apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions)
if err != nil {
return nil, err
}
apiextensionsConfig := &apiextensionsapiserver.Config{
GenericConfig: &genericapiserver.RecommendedConfig{
Config: genericConfig,
SharedInformerFactory: externalInformers,
},
ExtraConfig: apiextensionsapiserver.ExtraConfig{
CRDRESTOptionsGetter: crdRESTOptionsGetter,
CRDRESTOptionsGetter: apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions, genericConfig.ResourceTransformers, genericConfig.StorageObjectCountTracker),
MasterCount: masterCount,
AuthResolverWrapper: authResolverWrapper,
ServiceResolver: serviceResolver,

View File

@ -402,9 +402,6 @@ func buildGenericConfig(
} else {
s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider()
}
if lastErr = s.Etcd.Complete(genericConfig.DrainedNotify(), genericConfig.AddPostStartHook); lastErr != nil {
return
}
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig

View File

@ -94,9 +94,6 @@ func setUp(t *testing.T) (*etcd3testing.EtcdTestServer, Config, *assert.Assertio
etcdOptions := options.NewEtcdOptions(storageConfig)
// unit tests don't need watch cache and it leaks lots of goroutines with etcd testing functions during unit tests
etcdOptions.EnableWatchCache = false
if err := etcdOptions.Complete(config.GenericConfig.DrainedNotify(), config.GenericConfig.AddPostStartHook); err != nil {
t.Fatal(err)
}
err := etcdOptions.ApplyWithStorageFactoryTo(storageFactory, config.GenericConfig)
if err != nil {
t.Fatal(err)

View File

@ -21,7 +21,6 @@ import (
"io"
"net"
"net/url"
"reflect"
"github.com/spf13/pflag"
oteltrace "go.opentelemetry.io/otel/trace"
@ -36,6 +35,8 @@ import (
genericregistry "k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
storagevalue "k8s.io/apiserver/pkg/storage/value"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/apiserver/pkg/util/openapi"
"k8s.io/apiserver/pkg/util/proxy"
"k8s.io/apiserver/pkg/util/webhook"
@ -111,16 +112,12 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err
if err := o.APIEnablement.ApplyTo(&serverConfig.Config, apiserver.DefaultAPIResourceConfigSource(), apiserver.Scheme); err != nil {
return nil, err
}
crdRESTOptionsGetter, err := NewCRDRESTOptionsGetter(*o.RecommendedOptions.Etcd)
if err != nil {
return nil, err
}
serverConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions), openapinamer.NewDefinitionNamer(apiserver.Scheme, scheme.Scheme))
config := &apiserver.Config{
GenericConfig: serverConfig,
ExtraConfig: apiserver.ExtraConfig{
CRDRESTOptionsGetter: crdRESTOptionsGetter,
CRDRESTOptionsGetter: NewCRDRESTOptionsGetter(*o.RecommendedOptions.Etcd, serverConfig.ResourceTransformers, serverConfig.StorageObjectCountTracker),
ServiceResolver: &serviceResolver{serverConfig.SharedInformerFactory.Core().V1().Services().Lister()},
AuthResolverWrapper: webhook.NewDefaultAuthenticationInfoResolverWrapper(nil, nil, serverConfig.LoopbackClientConfig, oteltrace.NewNoopTracerProvider()),
},
@ -129,30 +126,16 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err
}
// NewCRDRESTOptionsGetter create a RESTOptionsGetter for CustomResources.
// This works on a copy of the etcd options so we don't mutate originals.
// We assume that the input etcd options have been completed already.
//
// Avoid messing with anything outside of changes to StorageConfig as that
// may lead to unexpected behavior when the options are applied.
func NewCRDRESTOptionsGetter(etcdOptions genericoptions.EtcdOptions) (genericregistry.RESTOptionsGetter, error) {
etcdOptions.StorageConfig.Codec = unstructured.UnstructuredJSONScheme
etcdOptions.WatchCacheSizes = nil // this control is not provided for custom resources
etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks
func NewCRDRESTOptionsGetter(etcdOptions genericoptions.EtcdOptions, resourceTransformers storagevalue.ResourceTransformers, tracker flowcontrolrequest.StorageObjectCountTracker) genericregistry.RESTOptionsGetter {
etcdOptionsCopy := etcdOptions
etcdOptionsCopy.StorageConfig.Codec = unstructured.UnstructuredJSONScheme
etcdOptionsCopy.StorageConfig.StorageObjectCountTracker = tracker
etcdOptionsCopy.WatchCacheSizes = nil // this control is not provided for custom resources
// creates a generic apiserver config for etcdOptions to mutate
c := genericapiserver.Config{}
if err := etcdOptions.ApplyTo(&c); err != nil {
return nil, err
}
restOptionsGetter := c.RESTOptionsGetter
if restOptionsGetter == nil {
return nil, fmt.Errorf("server.Config RESTOptionsGetter should not be nil")
}
// sanity check that no other fields are set
c.RESTOptionsGetter = nil
if !reflect.DeepEqual(c, genericapiserver.Config{}) {
return nil, fmt.Errorf("only RESTOptionsGetter should have been mutated in server.Config")
}
return restOptionsGetter, nil
return etcdOptions.CreateRESTOptionsGetter(&genericoptions.SimpleStorageFactory{StorageConfig: etcdOptionsCopy.StorageConfig}, resourceTransformers)
}
type serviceResolver struct {

View File

@ -182,10 +182,7 @@ func testWebhookConverter(t *testing.T, watchCache bool) {
crd := multiVersionFixture.DeepCopy()
RESTOptionsGetter, err := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd)
if err != nil {
t.Fatal(err)
}
RESTOptionsGetter := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd, nil, nil)
restOptions, err := RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural})
if err != nil {
t.Fatal(err)

View File

@ -658,10 +658,7 @@ func TestCustomResourceDefaultingOfMetaFields(t *testing.T) {
t.Logf("CR created: %#v", returnedFoo.UnstructuredContent())
// get persisted object
RESTOptionsGetter, err := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd)
if err != nil {
t.Fatal(err)
}
RESTOptionsGetter := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd, nil, nil)
restOptions, err := RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural})
if err != nil {
t.Fatal(err)

View File

@ -31,6 +31,8 @@ import (
serveroptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options"
servertesting "k8s.io/apiextensions-apiserver/pkg/cmd/server/testing"
"k8s.io/apimachinery/pkg/runtime/schema"
genericapiserver "k8s.io/apiserver/pkg/server"
storagevalue "k8s.io/apiserver/pkg/storage/value"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)
@ -145,10 +147,15 @@ func StartDefaultServerWithClientsAndEtcd(t servertesting.Logger, extraFlags ...
return nil, nil, nil, nil, "", err
}
RESTOptionsGetter, err := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd)
if err != nil {
return nil, nil, nil, nil, "", err
var resourceTransformers storagevalue.ResourceTransformers
if len(options.RecommendedOptions.Etcd.EncryptionProviderConfigFilepath) != 0 {
// be clever in tests to reconstruct the transformers, for encryption integration tests
config := genericapiserver.Config{}
options.RecommendedOptions.Etcd.ApplyTo(&config)
resourceTransformers = config.ResourceTransformers
}
RESTOptionsGetter := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd, resourceTransformers, nil)
restOptions, err := RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: "hopefully-ignored-group", Resource: "hopefully-ignored-resources"})
if err != nil {
return nil, nil, nil, nil, "", err

View File

@ -132,10 +132,7 @@ func TestInvalidObjectMetaInStorage(t *testing.T) {
t.Fatal(err)
}
RESTOptionsGetter, err := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd)
if err != nil {
t.Fatal(err)
}
RESTOptionsGetter := serveroptions.NewCRDRESTOptionsGetter(*options.RecommendedOptions.Etcd, nil, nil)
restOptions, err := RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: noxuDefinition.Spec.Group, Resource: noxuDefinition.Spec.Names.Plural})
if err != nil {
t.Fatal(err)

View File

@ -65,6 +65,7 @@ import (
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/routes"
serverstore "k8s.io/apiserver/pkg/server/storage"
storagevalue "k8s.io/apiserver/pkg/storage/value"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
@ -190,6 +191,8 @@ type Config struct {
// SkipOpenAPIInstallation avoids installing the OpenAPI handler if set to true.
SkipOpenAPIInstallation bool
// ResourceTransformers are used to transform resources from and to etcd, e.g. encryption.
ResourceTransformers storagevalue.ResourceTransformers
// RESTOptionsGetter is used to construct RESTStorage types via the generic registry.
RESTOptionsGetter genericregistry.RESTOptionsGetter

View File

@ -43,7 +43,7 @@ import (
"k8s.io/apiserver/pkg/apis/config/validation"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/storage/value"
storagevalue "k8s.io/apiserver/pkg/storage/value"
aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope"
envelopekmsv2 "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2"
@ -159,7 +159,7 @@ func (h *kmsv2PluginProbe) toHealthzCheck(idx int) healthz.HealthChecker {
// EncryptionConfiguration represents the parsed and normalized encryption configuration for the apiserver.
type EncryptionConfiguration struct {
// Transformers is a list of value.Transformer that will be used to encrypt and decrypt data.
Transformers map[schema.GroupResource]value.Transformer
Transformers map[schema.GroupResource]storagevalue.Transformer
// HealthChecks is a list of healthz.HealthChecker that will be used to check the health of the encryption providers.
HealthChecks []healthz.HealthChecker
@ -207,7 +207,7 @@ func LoadEncryptionConfig(ctx context.Context, filepath string, reload bool) (*E
// getTransformerOverridesAndKMSPluginHealthzCheckers creates the set of transformers and KMS healthz checks based on the given config.
// It may launch multiple go routines whose lifecycle is controlled by ctx.
// In case of an error, the caller is responsible for canceling ctx to clean up any go routines that may have been launched.
func getTransformerOverridesAndKMSPluginHealthzCheckers(ctx context.Context, config *apiserverconfig.EncryptionConfiguration) (map[schema.GroupResource]value.Transformer, []healthz.HealthChecker, *kmsState, error) {
func getTransformerOverridesAndKMSPluginHealthzCheckers(ctx context.Context, config *apiserverconfig.EncryptionConfiguration) (map[schema.GroupResource]storagevalue.Transformer, []healthz.HealthChecker, *kmsState, error) {
var kmsHealthChecks []healthz.HealthChecker
transformers, probes, kmsUsed, err := getTransformerOverridesAndKMSPluginProbes(ctx, config)
if err != nil {
@ -228,8 +228,8 @@ type healthChecker interface {
// getTransformerOverridesAndKMSPluginProbes creates the set of transformers and KMS probes based on the given config.
// It may launch multiple go routines whose lifecycle is controlled by ctx.
// In case of an error, the caller is responsible for canceling ctx to clean up any go routines that may have been launched.
func getTransformerOverridesAndKMSPluginProbes(ctx context.Context, config *apiserverconfig.EncryptionConfiguration) (map[schema.GroupResource]value.Transformer, []healthChecker, *kmsState, error) {
resourceToPrefixTransformer := map[schema.GroupResource][]value.PrefixTransformer{}
func getTransformerOverridesAndKMSPluginProbes(ctx context.Context, config *apiserverconfig.EncryptionConfiguration) (map[schema.GroupResource]storagevalue.Transformer, []healthChecker, *kmsState, error) {
resourceToPrefixTransformer := map[schema.GroupResource][]storagevalue.PrefixTransformer{}
var probes []healthChecker
var kmsUsed kmsState
@ -268,11 +268,11 @@ func getTransformerOverridesAndKMSPluginProbes(ctx context.Context, config *apis
probes = append(probes, p...)
}
transformers := make(map[schema.GroupResource]value.Transformer, len(resourceToPrefixTransformer))
transformers := make(map[schema.GroupResource]storagevalue.Transformer, len(resourceToPrefixTransformer))
for gr, transList := range resourceToPrefixTransformer {
gr := gr
transList := transList
transformers[gr] = value.NewPrefixTransformers(fmt.Errorf("no matching prefix found"), transList...)
transformers[gr] = storagevalue.NewPrefixTransformers(fmt.Errorf("no matching prefix found"), transList...)
}
return transformers, probes, &kmsUsed, nil
@ -478,15 +478,15 @@ func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfig
// prefixTransformersAndProbes creates the set of transformers and KMS probes based on the given resource config.
// It may launch multiple go routines whose lifecycle is controlled by ctx.
// In case of an error, the caller is responsible for canceling ctx to clean up any go routines that may have been launched.
func prefixTransformersAndProbes(ctx context.Context, config apiserverconfig.ResourceConfiguration) ([]value.PrefixTransformer, []healthChecker, *kmsState, error) {
var transformers []value.PrefixTransformer
func prefixTransformersAndProbes(ctx context.Context, config apiserverconfig.ResourceConfiguration) ([]storagevalue.PrefixTransformer, []healthChecker, *kmsState, error) {
var transformers []storagevalue.PrefixTransformer
var probes []healthChecker
var kmsUsed kmsState
for _, provider := range config.Providers {
provider := provider
var (
transformer value.PrefixTransformer
transformer storagevalue.PrefixTransformer
transformerErr error
probe healthChecker
used *kmsState
@ -497,7 +497,7 @@ func prefixTransformersAndProbes(ctx context.Context, config apiserverconfig.Res
transformer, transformerErr = aesPrefixTransformer(provider.AESGCM, aestransformer.NewGCMTransformer, aesGCMTransformerPrefixV1)
case provider.AESCBC != nil:
cbcTransformer := func(block cipher.Block) (value.Transformer, error) {
cbcTransformer := func(block cipher.Block) (storagevalue.Transformer, error) {
return aestransformer.NewCBCTransformer(block), nil
}
transformer, transformerErr = aesPrefixTransformer(provider.AESCBC, cbcTransformer, aesCBCTransformerPrefixV1)
@ -513,7 +513,7 @@ func prefixTransformersAndProbes(ctx context.Context, config apiserverconfig.Res
}
case provider.Identity != nil:
transformer = value.PrefixTransformer{
transformer = storagevalue.PrefixTransformer{
Transformer: identity.NewEncryptCheckTransformer(),
Prefix: []byte{},
}
@ -532,10 +532,10 @@ func prefixTransformersAndProbes(ctx context.Context, config apiserverconfig.Res
return transformers, probes, &kmsUsed, nil
}
type blockTransformerFunc func(cipher.Block) (value.Transformer, error)
type blockTransformerFunc func(cipher.Block) (storagevalue.Transformer, error)
func aesPrefixTransformer(config *apiserverconfig.AESConfiguration, fn blockTransformerFunc, prefix string) (value.PrefixTransformer, error) {
var result value.PrefixTransformer
func aesPrefixTransformer(config *apiserverconfig.AESConfiguration, fn blockTransformerFunc, prefix string) (storagevalue.PrefixTransformer, error) {
var result storagevalue.PrefixTransformer
if len(config.Keys) == 0 {
return result, fmt.Errorf("aes provider has no valid keys")
@ -550,7 +550,7 @@ func aesPrefixTransformer(config *apiserverconfig.AESConfiguration, fn blockTran
}
}
keyTransformers := []value.PrefixTransformer{}
keyTransformers := []storagevalue.PrefixTransformer{}
for _, keyData := range config.Keys {
keyData := keyData
@ -569,26 +569,26 @@ func aesPrefixTransformer(config *apiserverconfig.AESConfiguration, fn blockTran
// Create a new PrefixTransformer for this key
keyTransformers = append(keyTransformers,
value.PrefixTransformer{
storagevalue.PrefixTransformer{
Transformer: transformer,
Prefix: []byte(keyData.Name + ":"),
})
}
// Create a prefixTransformer which can choose between these keys
keyTransformer := value.NewPrefixTransformers(
keyTransformer := storagevalue.NewPrefixTransformers(
fmt.Errorf("no matching key was found for the provided AES transformer"), keyTransformers...)
// Create a PrefixTransformer which shall later be put in a list with other providers
result = value.PrefixTransformer{
result = storagevalue.PrefixTransformer{
Transformer: keyTransformer,
Prefix: []byte(prefix),
}
return result, nil
}
func secretboxPrefixTransformer(config *apiserverconfig.SecretboxConfiguration) (value.PrefixTransformer, error) {
var result value.PrefixTransformer
func secretboxPrefixTransformer(config *apiserverconfig.SecretboxConfiguration) (storagevalue.PrefixTransformer, error) {
var result storagevalue.PrefixTransformer
if len(config.Keys) == 0 {
return result, fmt.Errorf("secretbox provider has no valid keys")
@ -603,7 +603,7 @@ func secretboxPrefixTransformer(config *apiserverconfig.SecretboxConfiguration)
}
}
keyTransformers := []value.PrefixTransformer{}
keyTransformers := []storagevalue.PrefixTransformer{}
for _, keyData := range config.Keys {
keyData := keyData
@ -621,18 +621,18 @@ func secretboxPrefixTransformer(config *apiserverconfig.SecretboxConfiguration)
// Create a new PrefixTransformer for this key
keyTransformers = append(keyTransformers,
value.PrefixTransformer{
storagevalue.PrefixTransformer{
Transformer: secretbox.NewSecretboxTransformer(keyArray),
Prefix: []byte(keyData.Name + ":"),
})
}
// Create a prefixTransformer which can choose between these keys
keyTransformer := value.NewPrefixTransformers(
keyTransformer := storagevalue.NewPrefixTransformers(
fmt.Errorf("no matching key was found for the provided Secretbox transformer"), keyTransformers...)
// Create a PrefixTransformer which shall later be put in a list with other providers
result = value.PrefixTransformer{
result = storagevalue.PrefixTransformer{
Transformer: keyTransformer,
Prefix: []byte(secretboxTransformerPrefixV1),
}
@ -665,13 +665,13 @@ func (s *kmsState) accumulate(other *kmsState) {
// kmsPrefixTransformer creates a KMS transformer and probe based on the given KMS config.
// It may launch multiple go routines whose lifecycle is controlled by ctx.
// In case of an error, the caller is responsible for canceling ctx to clean up any go routines that may have been launched.
func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfiguration) (value.PrefixTransformer, healthChecker, *kmsState, error) {
func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfiguration) (storagevalue.PrefixTransformer, healthChecker, *kmsState, error) {
kmsName := config.Name
switch config.APIVersion {
case kmsAPIVersionV1:
envelopeService, err := envelopeServiceFactory(ctx, config.Endpoint, config.Timeout.Duration)
if err != nil {
return value.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMSv1-Plugin's probe %q, error: %w", kmsName, err)
return storagevalue.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMSv1-Plugin's probe %q, error: %w", kmsName, err)
}
probe := &kmsPluginProbe{
@ -692,12 +692,12 @@ func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfig
case kmsAPIVersionV2:
if !utilfeature.DefaultFeatureGate.Enabled(features.KMSv2) {
return value.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMSv2 plugin %q, KMSv2 feature is not enabled", kmsName)
return storagevalue.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMSv2 plugin %q, KMSv2 feature is not enabled", kmsName)
}
envelopeService, err := EnvelopeKMSv2ServiceFactory(ctx, config.Endpoint, config.Name, config.Timeout.Duration)
if err != nil {
return value.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, error: %w", kmsName, err)
return storagevalue.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, error: %w", kmsName, err)
}
probe := &kmsv2PluginProbe{
@ -748,7 +748,7 @@ func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfig
})
// using AES-GCM by default for encrypting data with KMSv2
transformer := value.PrefixTransformer{
transformer := storagevalue.PrefixTransformer{
Transformer: envelopekmsv2.NewEnvelopeTransformer(envelopeService, kmsName, probe.getCurrentState),
Prefix: []byte(kmsTransformerPrefixV2 + kmsName + ":"),
}
@ -759,12 +759,12 @@ func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfig
}, nil
default:
return value.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMS plugin %q, unsupported KMS API version %q", kmsName, config.APIVersion)
return storagevalue.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMS plugin %q, unsupported KMS API version %q", kmsName, config.APIVersion)
}
}
func envelopePrefixTransformer(config *apiserverconfig.KMSConfiguration, envelopeService envelope.Service, prefix string) value.PrefixTransformer {
baseTransformerFunc := func(block cipher.Block) (value.Transformer, error) {
func envelopePrefixTransformer(config *apiserverconfig.KMSConfiguration, envelopeService envelope.Service, prefix string) storagevalue.PrefixTransformer {
baseTransformerFunc := func(block cipher.Block) (storagevalue.Transformer, error) {
gcm, err := aestransformer.NewGCMTransformer(block)
if err != nil {
return nil, err
@ -777,15 +777,15 @@ func envelopePrefixTransformer(config *apiserverconfig.KMSConfiguration, envelop
return unionTransformers{gcm, aestransformer.NewCBCTransformer(block)}, nil
}
return value.PrefixTransformer{
return storagevalue.PrefixTransformer{
Transformer: envelope.NewEnvelopeTransformer(envelopeService, int(*config.CacheSize), baseTransformerFunc),
Prefix: []byte(prefix + config.Name + ":"),
}
}
type unionTransformers []value.Transformer
type unionTransformers []storagevalue.Transformer
func (u unionTransformers) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) (out []byte, stale bool, err error) {
func (u unionTransformers) TransformFromStorage(ctx context.Context, data []byte, dataCtx storagevalue.Context) (out []byte, stale bool, err error) {
var errs []error
for i := range u {
transformer := u[i]
@ -804,7 +804,7 @@ func (u unionTransformers) TransformFromStorage(ctx context.Context, data []byte
return nil, false, fmt.Errorf("unionTransformers: unable to transform from storage")
}
func (u unionTransformers) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) (out []byte, err error) {
func (u unionTransformers) TransformToStorage(ctx context.Context, data []byte, dataCtx storagevalue.Context) (out []byte, err error) {
return u[0].TransformToStorage(ctx, data, dataCtx)
}
@ -815,7 +815,7 @@ func computeEncryptionConfigHash(data []byte) string {
return fmt.Sprintf("%x", sha256.Sum256(data))
}
var _ ResourceTransformers = &DynamicTransformers{}
var _ storagevalue.ResourceTransformers = &DynamicTransformers{}
var _ healthz.HealthChecker = &DynamicTransformers{}
// DynamicTransformers holds transformers that may be dynamically updated via a single external actor, likely a controller.
@ -825,7 +825,7 @@ type DynamicTransformers struct {
}
type transformTracker struct {
transformerOverrides map[schema.GroupResource]value.Transformer
transformerOverrides map[schema.GroupResource]storagevalue.Transformer
kmsPluginHealthzCheck healthz.HealthChecker
closeTransformers context.CancelFunc
kmsCloseGracePeriod time.Duration
@ -833,7 +833,7 @@ type transformTracker struct {
// NewDynamicTransformers returns transformers, health checks for kms providers and an ability to close transformers.
func NewDynamicTransformers(
transformerOverrides map[schema.GroupResource]value.Transformer,
transformerOverrides map[schema.GroupResource]storagevalue.Transformer,
kmsPluginHealthzCheck healthz.HealthChecker,
closeTransformers context.CancelFunc,
kmsCloseGracePeriod time.Duration,
@ -864,7 +864,7 @@ func (d *DynamicTransformers) Name() string {
}
// TransformerForResource returns the transformer for the given resource.
func (d *DynamicTransformers) TransformerForResource(resource schema.GroupResource) value.Transformer {
func (d *DynamicTransformers) TransformerForResource(resource schema.GroupResource) storagevalue.Transformer {
return &resourceTransformer{
resource: resource,
transformTracker: d.transformTracker,
@ -873,7 +873,7 @@ func (d *DynamicTransformers) TransformerForResource(resource schema.GroupResour
// Set sets the transformer overrides. This method is not go routine safe and must only be called by the same, single caller throughout the lifetime of this object.
func (d *DynamicTransformers) Set(
transformerOverrides map[schema.GroupResource]value.Transformer,
transformerOverrides map[schema.GroupResource]storagevalue.Transformer,
closeTransformers context.CancelFunc,
kmsPluginHealthzCheck healthz.HealthChecker,
kmsCloseGracePeriod time.Duration,
@ -898,34 +898,30 @@ func (d *DynamicTransformers) Set(
}()
}
var _ value.Transformer = &resourceTransformer{}
var _ storagevalue.Transformer = &resourceTransformer{}
type resourceTransformer struct {
resource schema.GroupResource
transformTracker *atomic.Value
}
func (r *resourceTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
func (r *resourceTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx storagevalue.Context) ([]byte, bool, error) {
return r.transformer().TransformFromStorage(ctx, data, dataCtx)
}
func (r *resourceTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
func (r *resourceTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx storagevalue.Context) ([]byte, error) {
return r.transformer().TransformToStorage(ctx, data, dataCtx)
}
func (r *resourceTransformer) transformer() value.Transformer {
func (r *resourceTransformer) transformer() storagevalue.Transformer {
return transformerFromOverrides(r.transformTracker.Load().(*transformTracker).transformerOverrides, r.resource)
}
type ResourceTransformers interface {
TransformerForResource(resource schema.GroupResource) value.Transformer
}
var _ storagevalue.ResourceTransformers = &StaticTransformers{}
var _ ResourceTransformers = &StaticTransformers{}
type StaticTransformers map[schema.GroupResource]storagevalue.Transformer
type StaticTransformers map[schema.GroupResource]value.Transformer
func (s StaticTransformers) TransformerForResource(resource schema.GroupResource) value.Transformer {
func (s StaticTransformers) TransformerForResource(resource schema.GroupResource) storagevalue.Transformer {
return transformerFromOverrides(s, resource)
}
@ -934,7 +930,7 @@ var anyGroupAnyResource = schema.GroupResource{
Resource: "*",
}
func transformerFromOverrides(transformerOverrides map[schema.GroupResource]value.Transformer, resource schema.GroupResource) value.Transformer {
func transformerFromOverrides(transformerOverrides map[schema.GroupResource]storagevalue.Transformer, resource schema.GroupResource) storagevalue.Transformer {
if transformer := transformerOverrides[resource]; transformer != nil {
return transformer
}

View File

@ -38,6 +38,7 @@ import (
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
storagevalue "k8s.io/apiserver/pkg/storage/value"
"k8s.io/klog/v2"
)
@ -63,11 +64,6 @@ type EtcdOptions struct {
// WatchCacheSizes represents override to a given resource
WatchCacheSizes []string
// complete guards fields that must be initialized via Complete before the Apply methods can be used.
complete bool
resourceTransformers encryptionconfig.ResourceTransformers
kmsPluginHealthzChecks []healthz.HealthChecker
// SkipHealthEndpoints, when true, causes the Apply methods to not set up health endpoints.
// This allows multiple invocations of the Apply methods without duplication of said endpoints.
SkipHealthEndpoints bool
@ -211,94 +207,18 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
"The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.")
}
// Complete must be called exactly once before using any of the Apply methods. It is responsible for setting
// up objects that must be created once and reused across multiple invocations such as storage transformers.
// This method mutates the receiver (EtcdOptions). It must never mutate the inputs.
func (s *EtcdOptions) Complete(
stopCh <-chan struct{},
addPostStartHook func(name string, hook server.PostStartHookFunc) error,
) error {
if s == nil {
return nil
}
if s.complete {
return fmt.Errorf("EtcdOptions.Complete called more than once")
}
if len(s.EncryptionProviderConfigFilepath) != 0 {
ctxServer := wait.ContextForChannel(stopCh)
// nolint:govet // The only code path where closeTransformers does not get called is when it gets stored in dynamicTransformers.
ctxTransformers, closeTransformers := context.WithCancel(ctxServer)
encryptionConfiguration, err := encryptionconfig.LoadEncryptionConfig(ctxTransformers, s.EncryptionProviderConfigFilepath, s.EncryptionProviderConfigAutomaticReload)
if err != nil {
// in case of error, we want to close partially initialized (if any) transformers
closeTransformers()
return err
}
// enable kms hot reload controller only if the config file is set to be automatically reloaded
if s.EncryptionProviderConfigAutomaticReload {
// with reload=true we will always have 1 health check
if len(encryptionConfiguration.HealthChecks) != 1 {
// in case of error, we want to close partially initialized (if any) transformers
closeTransformers()
return fmt.Errorf("failed to start kms encryption config hot reload controller. only 1 health check should be available when reload is enabled")
}
// Here the dynamic transformers take ownership of the transformers and their cancellation.
dynamicTransformers := encryptionconfig.NewDynamicTransformers(encryptionConfiguration.Transformers, encryptionConfiguration.HealthChecks[0], closeTransformers, encryptionConfiguration.KMSCloseGracePeriod)
// add post start hook to start hot reload controller
// adding this hook here will ensure that it gets configured exactly once
err = addPostStartHook(
"start-encryption-provider-config-automatic-reload",
func(_ server.PostStartHookContext) error {
dynamicEncryptionConfigController := encryptionconfigcontroller.NewDynamicEncryptionConfiguration(
"encryption-provider-config-automatic-reload-controller",
s.EncryptionProviderConfigFilepath,
dynamicTransformers,
encryptionConfiguration.EncryptionFileContentHash,
)
go dynamicEncryptionConfigController.Run(ctxServer)
return nil
},
)
if err != nil {
// in case of error, we want to close partially initialized (if any) transformers
closeTransformers()
return fmt.Errorf("failed to add post start hook for kms encryption config hot reload controller: %w", err)
}
s.resourceTransformers = dynamicTransformers
s.kmsPluginHealthzChecks = []healthz.HealthChecker{dynamicTransformers}
} else {
s.resourceTransformers = encryptionconfig.StaticTransformers(encryptionConfiguration.Transformers)
s.kmsPluginHealthzChecks = encryptionConfiguration.HealthChecks
}
}
s.complete = true
// nolint:govet // The only code path where closeTransformers does not get called is when it gets stored in dynamicTransformers.
return nil
}
// ApplyTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions).
func (s *EtcdOptions) ApplyTo(c *server.Config) error {
if s == nil {
return nil
}
storageConfig := s.StorageConfig
if storageConfig.StorageObjectCountTracker == nil {
storageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker
storageConfigCopy := s.StorageConfig
if storageConfigCopy.StorageObjectCountTracker == nil {
storageConfigCopy.StorageObjectCountTracker = c.StorageObjectCountTracker
}
return s.ApplyWithStorageFactoryTo(&SimpleStorageFactory{StorageConfig: storageConfig}, c)
return s.ApplyWithStorageFactoryTo(&SimpleStorageFactory{StorageConfig: storageConfigCopy}, c)
}
// ApplyWithStorageFactoryTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions).
@ -307,24 +227,94 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
return nil
}
if !s.complete {
return fmt.Errorf("EtcdOptions.Apply called without completion")
}
if !s.SkipHealthEndpoints {
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
}
if s.resourceTransformers != nil {
// setup encryption
if err := s.maybeApplyResourceTransformers(c); err != nil {
return err
}
c.RESTOptionsGetter = s.CreateRESTOptionsGetter(factory, c.ResourceTransformers)
return nil
}
func (s *EtcdOptions) CreateRESTOptionsGetter(factory serverstorage.StorageFactory, resourceTransformers storagevalue.ResourceTransformers) generic.RESTOptionsGetter {
if resourceTransformers != nil {
factory = &transformerStorageFactory{
delegate: factory,
resourceTransformers: s.resourceTransformers,
resourceTransformers: resourceTransformers,
}
}
return &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
}
func (s *EtcdOptions) maybeApplyResourceTransformers(c *server.Config) (err error) {
if c.ResourceTransformers != nil {
return nil
}
if len(s.EncryptionProviderConfigFilepath) == 0 {
return nil
}
ctxServer := wait.ContextForChannel(c.DrainedNotify())
ctxTransformers, closeTransformers := context.WithCancel(ctxServer)
defer func() {
// in case of error, we want to close partially initialized (if any) transformers
if err != nil {
closeTransformers()
}
}()
encryptionConfiguration, err := encryptionconfig.LoadEncryptionConfig(ctxTransformers, s.EncryptionProviderConfigFilepath, s.EncryptionProviderConfigAutomaticReload)
if err != nil {
return err
}
if s.EncryptionProviderConfigAutomaticReload {
// with reload=true we will always have 1 health check
if len(encryptionConfiguration.HealthChecks) != 1 {
return fmt.Errorf("failed to start kms encryption config hot reload controller. only 1 health check should be available when reload is enabled")
}
// Here the dynamic transformers take ownership of the transformers and their cancellation.
dynamicTransformers := encryptionconfig.NewDynamicTransformers(encryptionConfiguration.Transformers, encryptionConfiguration.HealthChecks[0], closeTransformers, encryptionConfiguration.KMSCloseGracePeriod)
// add post start hook to start hot reload controller
// adding this hook here will ensure that it gets configured exactly once
err = c.AddPostStartHook(
"start-encryption-provider-config-automatic-reload",
func(_ server.PostStartHookContext) error {
dynamicEncryptionConfigController := encryptionconfigcontroller.NewDynamicEncryptionConfiguration(
"encryption-provider-config-automatic-reload-controller",
s.EncryptionProviderConfigFilepath,
dynamicTransformers,
encryptionConfiguration.EncryptionFileContentHash,
)
go dynamicEncryptionConfigController.Run(ctxServer)
return nil
},
)
if err != nil {
return fmt.Errorf("failed to add post start hook for kms encryption config hot reload controller: %w", err)
}
c.ResourceTransformers = dynamicTransformers
if !s.SkipHealthEndpoints {
c.AddHealthChecks(dynamicTransformers)
}
} else {
c.ResourceTransformers = encryptionconfig.StaticTransformers(encryptionConfiguration.Transformers)
if !s.SkipHealthEndpoints {
c.AddHealthChecks(encryptionConfiguration.HealthChecks...)
}
}
c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
}
@ -345,8 +335,6 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
return readyCheck()
}))
c.AddHealthChecks(s.kmsPluginHealthzChecks...)
return nil
}
@ -454,7 +442,7 @@ var _ serverstorage.StorageFactory = &transformerStorageFactory{}
type transformerStorageFactory struct {
delegate serverstorage.StorageFactory
resourceTransformers encryptionconfig.ResourceTransformers
resourceTransformers storagevalue.ResourceTransformers
}
func (t *transformerStorageFactory) NewConfig(resource schema.GroupResource) (*storagebackend.ConfigForResource, error) {

View File

@ -306,9 +306,6 @@ func TestKMSHealthzEndpoint(t *testing.T) {
EncryptionProviderConfigAutomaticReload: tc.reload,
SkipHealthEndpoints: tc.skipHealth,
}
if err := etcdOptions.Complete(serverConfig.DrainedNotify(), serverConfig.AddPostStartHook); err != nil {
t.Fatal(err)
}
if err := etcdOptions.ApplyTo(serverConfig); err != nil {
t.Fatalf("Failed to add healthz error: %v", err)
}
@ -345,9 +342,6 @@ func TestReadinessCheck(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
serverConfig := server.NewConfig(codecs)
etcdOptions := &EtcdOptions{SkipHealthEndpoints: tc.skipHealth}
if err := etcdOptions.Complete(serverConfig.DrainedNotify(), serverConfig.AddPostStartHook); err != nil {
t.Fatal(err)
}
if err := etcdOptions.ApplyTo(serverConfig); err != nil {
t.Fatalf("Failed to add healthz error: %v", err)
}

View File

@ -101,9 +101,6 @@ func (o *RecommendedOptions) AddFlags(fs *pflag.FlagSet) {
// ApplyTo adds RecommendedOptions to the server configuration.
// pluginInitializers can be empty, it is only need for additional initializers.
func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
if err := o.Etcd.Complete(config.Config.DrainedNotify(), config.Config.AddPostStartHook); err != nil {
return err
}
if err := o.Etcd.ApplyTo(&config.Config); err != nil {
return err
}

View File

@ -23,6 +23,7 @@ import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/errors"
)
@ -50,6 +51,11 @@ type Transformer interface {
TransformToStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, err error)
}
// ResourceTransformers returns a transformer for the provided resource.
type ResourceTransformers interface {
TransformerForResource(resource schema.GroupResource) Transformer
}
// DefaultContext is a simple implementation of Context for a slice of bytes.
type DefaultContext []byte