Load encryption config once

This change updates the API server code to load the encryption
config once at start up instead of multiple times.  Previously the
code would set up the storage transformers and the etcd healthz
checks in separate parse steps.  This is problematic for KMS v2 key
ID based staleness checks which need to be able to assert that the
API server has a single view into the KMS plugin's current key ID.

Signed-off-by: Monis Khan <mok@microsoft.com>
This commit is contained in:
Monis Khan 2022-08-29 17:25:48 -04:00
parent c1602669a6
commit f507bc2553
No known key found for this signature in database
21 changed files with 348 additions and 230 deletions

View File

@ -37,7 +37,6 @@ import (
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
genericoptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubeexternalinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
@ -91,11 +90,16 @@ func createAggregatorConfig(
}
// copy the etcd options so we don't mutate originals.
// we assume that the etcd options have been completed already. avoid messing with anything outside
// of changes to StorageConfig as that may lead to unexpected behavior when the options are applied.
etcdOptions := *commandOptions.Etcd
etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIListChunking)
etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion)
etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks
if err := etcdOptions.ApplyTo(&genericConfig); err != nil {
return nil, err
}
// override MergedResourceConfig with aggregator defaults and registry
if err := commandOptions.APIEnablement.ApplyTo(

View File

@ -29,7 +29,6 @@ import (
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/webhook"
kubeexternalinformers "k8s.io/client-go/informers"
@ -64,13 +63,18 @@ func createAPIExtensionsConfig(
}
// copy the etcd options so we don't mutate originals.
// we assume that the etcd options have been completed already. avoid messing with anything outside
// of changes to StorageConfig as that may lead to unexpected behavior when the options are applied.
etcdOptions := *commandOptions.Etcd
etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
// this is where the true decodable levels come from.
etcdOptions.StorageConfig.Codec = apiextensionsapiserver.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion)
// prefer the more compact serialization (v1beta1) for storage until https://issue.k8s.io/82292 is resolved for objects whose v1 serialization is too big but whose v1beta1 serialization can be stored
etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1beta1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks
if err := etcdOptions.ApplyTo(&genericConfig); err != nil {
return nil, err
}
// override MergedResourceConfig with apiextensions defaults and registry
if err := commandOptions.APIEnablement.ApplyTo(

View File

@ -397,24 +397,23 @@ func buildGenericConfig(
kubeVersion := version.Get()
genericConfig.Version = &kubeVersion
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
if err != nil {
lastErr = err
return
}
storageFactory, lastErr = completedStorageFactoryConfig.New(genericConfig.DrainedNotify())
if lastErr != nil {
return
}
if genericConfig.EgressSelector != nil {
storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
s.Etcd.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
storageFactory.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider
s.Etcd.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider
} else {
storageFactory.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider()
s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider()
}
if lastErr = s.Etcd.Complete(genericConfig.StorageObjectCountTracker, genericConfig.DrainedNotify()); lastErr != nil {
return
}
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New()
if lastErr != nil {
return
}
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return

View File

@ -89,6 +89,9 @@ func setUp(t *testing.T) (*etcd3testing.EtcdTestServer, Config, *assert.Assertio
etcdOptions := options.NewEtcdOptions(storageConfig)
// unit tests don't need watch cache and it leaks lots of goroutines with etcd testing functions during unit tests
etcdOptions.EnableWatchCache = false
if err := etcdOptions.Complete(config.GenericConfig.StorageObjectCountTracker, config.GenericConfig.DrainedNotify()); err != nil {
t.Fatal(err)
}
err := etcdOptions.ApplyWithStorageFactoryTo(storageFactory, config.GenericConfig)
if err != nil {
t.Fatal(err)

View File

@ -22,7 +22,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
serveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
"k8s.io/apiserver/pkg/server/resourceconfig"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
@ -58,7 +57,6 @@ func DefaultWatchCacheSizes() map[schema.GroupResource]int {
// NewStorageFactoryConfig returns a new StorageFactoryConfig set up with necessary resource overrides.
func NewStorageFactoryConfig() *StorageFactoryConfig {
resources := []schema.GroupVersionResource{
// If a resource has to be stored in a version that is not the
// latest, then it can be listed here. Usually this is the case
@ -83,23 +81,22 @@ func NewStorageFactoryConfig() *StorageFactoryConfig {
// StorageFactoryConfig is a configuration for creating storage factory.
type StorageFactoryConfig struct {
StorageConfig storagebackend.Config
APIResourceConfig *serverstorage.ResourceConfig
DefaultResourceEncoding *serverstorage.DefaultResourceEncodingConfig
DefaultStorageMediaType string
Serializer runtime.StorageSerializer
ResourceEncodingOverrides []schema.GroupVersionResource
EtcdServersOverrides []string
EncryptionProviderConfigFilepath string
StorageConfig storagebackend.Config
APIResourceConfig *serverstorage.ResourceConfig
DefaultResourceEncoding *serverstorage.DefaultResourceEncodingConfig
DefaultStorageMediaType string
Serializer runtime.StorageSerializer
ResourceEncodingOverrides []schema.GroupVersionResource
EtcdServersOverrides []string
}
// Complete completes the StorageFactoryConfig with provided etcdOptions returning completedStorageFactoryConfig.
func (c *StorageFactoryConfig) Complete(etcdOptions *serveroptions.EtcdOptions) (*completedStorageFactoryConfig, error) {
// This method mutates the receiver (StorageFactoryConfig). It must never mutate the inputs.
func (c *StorageFactoryConfig) Complete(etcdOptions *serveroptions.EtcdOptions) *completedStorageFactoryConfig {
c.StorageConfig = etcdOptions.StorageConfig
c.DefaultStorageMediaType = etcdOptions.DefaultStorageMediaType
c.EtcdServersOverrides = etcdOptions.EtcdServersOverrides
c.EncryptionProviderConfigFilepath = etcdOptions.EncryptionProviderConfigFilepath
return &completedStorageFactoryConfig{c}, nil
return &completedStorageFactoryConfig{c}
}
// completedStorageFactoryConfig is a wrapper around StorageFactoryConfig completed with etcd options.
@ -111,7 +108,7 @@ type completedStorageFactoryConfig struct {
}
// New returns a new storage factory created from the completed storage factory configuration.
func (c *completedStorageFactoryConfig) New(stopCh <-chan struct{}) (*serverstorage.DefaultStorageFactory, error) {
func (c *completedStorageFactoryConfig) New() (*serverstorage.DefaultStorageFactory, error) {
resourceEncodingConfig := resourceconfig.MergeResourceEncodingConfigs(c.DefaultResourceEncoding, c.ResourceEncodingOverrides)
storageFactory := serverstorage.NewDefaultStorageFactory(
c.StorageConfig,
@ -141,14 +138,5 @@ func (c *completedStorageFactoryConfig) New(stopCh <-chan struct{}) (*serverstor
servers := strings.Split(tokens[1], ";")
storageFactory.SetEtcdLocation(groupResource, servers)
}
if len(c.EncryptionProviderConfigFilepath) != 0 {
transformerOverrides, err := encryptionconfig.GetTransformerOverrides(c.EncryptionProviderConfigFilepath, stopCh)
if err != nil {
return nil, err
}
for groupResource, transformer := range transformerOverrides {
storageFactory.SetTransformer(groupResource, transformer)
}
}
return storageFactory, nil
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package registrytest
import (
"context"
"testing"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -36,18 +35,12 @@ func NewEtcdStorage(t *testing.T, group string) (*storagebackend.ConfigForResour
func NewEtcdStorageForResource(t *testing.T, resource schema.GroupResource) (*storagebackend.ConfigForResource, *etcd3testing.EtcdTestServer) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
server, config := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
options := options.NewEtcdOptions(config)
completedConfig, err := kubeapiserver.NewStorageFactoryConfig().Complete(options)
if err != nil {
t.Fatal(err)
}
completedConfig := kubeapiserver.NewStorageFactoryConfig().Complete(options)
completedConfig.APIResourceConfig = serverstorage.NewResourceConfig()
factory, err := completedConfig.New(ctx.Done())
factory, err := completedConfig.New()
if err != nil {
t.Fatal(err)
}

View File

@ -28,13 +28,13 @@ import (
"github.com/spf13/pflag"
"k8s.io/apiextensions-apiserver/pkg/apiserver"
"k8s.io/apiextensions-apiserver/pkg/cmd/server/options"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2"
)
@ -47,10 +47,11 @@ type TestServerInstanceOptions struct {
// TestServer return values supplied by kube-test-ApiServer
type TestServer struct {
ClientConfig *restclient.Config // Rest client config
ServerOpts *options.CustomResourceDefinitionsServerOptions // ServerOpts
TearDownFn TearDownFunc // TearDown function
TmpDir string // Temp Dir used, by the apiserver
ClientConfig *restclient.Config // Rest client config
ServerOpts *options.CustomResourceDefinitionsServerOptions // ServerOpts
TearDownFn TearDownFunc // TearDown function
TmpDir string // Temp Dir used, by the apiserver
CompletedConfig apiserver.CompletedConfig
}
// Logger allows t.Testing and b.Testing to be passed to StartTestServer and StartTestServerOrDie
@ -144,7 +145,8 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin
if err != nil {
return result, fmt.Errorf("failed to create config from options: %v", err)
}
server, err := config.Complete().New(genericapiserver.NewEmptyDelegate())
completedConfig := config.Complete()
server, err := completedConfig.New(genericapiserver.NewEmptyDelegate())
if err != nil {
return result, fmt.Errorf("failed to create server: %v", err)
}
@ -187,6 +189,7 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin
result.ClientConfig = server.GenericAPIServer.LoopbackClientConfig
result.ServerOpts = s
result.TearDownFn = tearDown
result.CompletedConfig = completedConfig
return result, nil
}

View File

@ -26,21 +26,38 @@ import (
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
serveroptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiextensions-apiserver/pkg/apiserver"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
serveroptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options"
servertesting "k8s.io/apiextensions-apiserver/pkg/cmd/server/testing"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)
// StartDefaultServer starts a test server.
func StartDefaultServer(t servertesting.Logger, flags ...string) (func(), *rest.Config, *serveroptions.CustomResourceDefinitionsServerOptions, error) {
tearDownFn, s, err := startDefaultServer(t, flags...)
if err != nil {
return nil, nil, nil, err
}
return tearDownFn, s.ClientConfig, s.ServerOpts, nil
}
func StartDefaultServerWithConfigAccess(t servertesting.Logger, flags ...string) (func(), *rest.Config, apiserver.CompletedConfig, error) {
tearDownFn, s, err := startDefaultServer(t, flags...)
if err != nil {
return nil, nil, apiserver.CompletedConfig{}, err
}
return tearDownFn, s.ClientConfig, s.CompletedConfig, nil
}
func startDefaultServer(t servertesting.Logger, flags ...string) (func(), servertesting.TestServer, error) {
// create kubeconfig which will not actually be used. But authz/authn needs it to startup.
fakeKubeConfig, err := ioutil.TempFile("", "kubeconfig")
if err != nil {
return nil, nil, nil, err
return nil, servertesting.TestServer{}, err
}
fakeKubeConfig.WriteString(`
apiVersion: v1
@ -75,7 +92,7 @@ users:
), nil)
if err != nil {
os.Remove(fakeKubeConfig.Name())
return nil, nil, nil, err
return nil, servertesting.TestServer{}, err
}
tearDownFn := func() {
@ -83,7 +100,7 @@ users:
s.TearDownFn()
}
return tearDownFn, s.ClientConfig, s.ServerOpts, nil
return tearDownFn, s, nil
}
// StartDefaultServerWithClients starts a test server and returns clients for it.

View File

@ -298,7 +298,7 @@ func TestPruningStatus(t *testing.T) {
}
func TestPruningFromStorage(t *testing.T) {
tearDown, config, options, err := fixtures.StartDefaultServer(t)
tearDown, config, completedConfig, err := fixtures.StartDefaultServerWithConfigAccess(t)
if err != nil {
t.Fatal(err)
}
@ -314,11 +314,6 @@ func TestPruningFromStorage(t *testing.T) {
t.Fatal(err)
}
serverConfig, err := options.Config()
if err != nil {
t.Fatal(err)
}
crd := pruningFixture.DeepCopy()
crd.Spec.Versions[0].Schema = &apiextensionsv1.CustomResourceValidation{}
if err := yaml.Unmarshal([]byte(fooSchema), &crd.Spec.Versions[0].Schema.OpenAPIV3Schema); err != nil {
@ -330,7 +325,7 @@ func TestPruningFromStorage(t *testing.T) {
t.Fatal(err)
}
restOptions, err := serverConfig.GenericConfig.RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural})
restOptions, err := completedConfig.GenericConfig.RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural})
if err != nil {
t.Fatal(err)
}

View File

@ -43,10 +43,12 @@ const (
)
var (
aesKeySizes = []int{16, 24, 32}
// See https://golang.org/pkg/crypto/aes/#NewCipher for details on supported key sizes for AES.
secretBoxKeySizes = []int{32}
aesKeySizes = []int{16, 24, 32}
// See https://godoc.org/golang.org/x/crypto/nacl/secretbox#Open for details on the supported key sizes for Secretbox.
secretBoxKeySizes = []int{32}
root = field.NewPath("resources")
)

View File

@ -94,16 +94,6 @@ func (p *kmsv2PluginProbe) toHealthzCheck(idx int) healthz.HealthChecker {
})
}
func GetKMSPluginHealthzCheckers(filepath string, stopCh <-chan struct{}) ([]healthz.HealthChecker, error) {
_, kmsHealthChecks, err := LoadEncryptionConfig(filepath, stopCh)
return kmsHealthChecks, err
}
func GetTransformerOverrides(filepath string, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, error) {
transformers, _, err := LoadEncryptionConfig(filepath, stopCh)
return transformers, err
}
func LoadEncryptionConfig(filepath string, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, []healthz.HealthChecker, error) {
config, err := loadConfig(filepath)
if err != nil {
@ -159,7 +149,7 @@ func getTransformerOverridesAndKMSPluginProbes(config *apiserverconfig.Encryptio
for gr, transList := range resourceToPrefixTransformer {
gr := gr
transList := transList
transformers[gr] = value.NewMutableTransformer(value.NewPrefixTransformers(fmt.Errorf("no matching prefix found"), transList...))
transformers[gr] = value.NewPrefixTransformers(fmt.Errorf("no matching prefix found"), transList...)
}
return transformers, probes, nil
@ -425,8 +415,8 @@ var (
// The factory to create kms service. This is to make writing test easier.
envelopeServiceFactory = envelope.NewGRPCService
// The factory to create kmsv2 service.
envelopeKMSv2ServiceFactory = envelopekmsv2.NewGRPCService
// The factory to create kmsv2 service. Exported for integration tests.
EnvelopeKMSv2ServiceFactory = envelopekmsv2.NewGRPCService
)
func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-chan struct{}) (value.PrefixTransformer, healthChecker, error) {
@ -458,7 +448,7 @@ func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-cha
return value.PrefixTransformer{}, nil, fmt.Errorf("could not configure KMSv2 plugin %q, KMSv2 feature is not enabled", kmsName)
}
envelopeService, err := envelopeKMSv2ServiceFactory(ctx, config.Endpoint, config.Timeout.Duration)
envelopeService, err := EnvelopeKMSv2ServiceFactory(ctx, config.Endpoint, config.Timeout.Duration)
if err != nil {
return value.PrefixTransformer{}, nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, error: %v", kmsName, err)
}

View File

@ -163,12 +163,12 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
// Set factory for mock envelope service
factory := envelopeServiceFactory
factoryKMSv2 := envelopeKMSv2ServiceFactory
factoryKMSv2 := EnvelopeKMSv2ServiceFactory
envelopeServiceFactory = newMockEnvelopeService
envelopeKMSv2ServiceFactory = newMockEnvelopeKMSv2Service
EnvelopeKMSv2ServiceFactory = newMockEnvelopeKMSv2Service
defer func() {
envelopeServiceFactory = factory
envelopeKMSv2ServiceFactory = factoryKMSv2
EnvelopeKMSv2ServiceFactory = factoryKMSv2
}()
ctx := testContext(t)

View File

@ -36,6 +36,7 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend"
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/apiserver/pkg/storage/value"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/klog/v2"
)
@ -59,6 +60,15 @@ type EtcdOptions struct {
DefaultWatchCacheSize int
// WatchCacheSizes represents override to a given resource
WatchCacheSizes []string
// complete guards fields that must be initialized via Complete before the Apply methods can be used.
complete bool
transformerOverrides map[schema.GroupResource]value.Transformer
kmsPluginHealthzChecks []healthz.HealthChecker
// SkipHealthEndpoints, when true, causes the Apply methods to not set up health endpoints.
// This allows multiple invocations of the Apply methods without duplication of said endpoints.
SkipHealthEndpoints bool
}
var storageTypes = sets.NewString(
@ -190,39 +200,65 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
"The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.")
}
// Complete must be called exactly once before using any of the Apply methods. It is responsible for setting
// up objects that must be created once and reused across multiple invocations such as storage transformers.
// This method mutates the receiver (EtcdOptions). It must never mutate the inputs.
func (s *EtcdOptions) Complete(storageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker, stopCh <-chan struct{}) error {
if s == nil {
return nil
}
if s.complete {
return fmt.Errorf("EtcdOptions.Complete called more than once")
}
if len(s.EncryptionProviderConfigFilepath) != 0 {
transformerOverrides, kmsPluginHealthzChecks, err := encryptionconfig.LoadEncryptionConfig(s.EncryptionProviderConfigFilepath, stopCh)
if err != nil {
return err
}
s.transformerOverrides = transformerOverrides
s.kmsPluginHealthzChecks = kmsPluginHealthzChecks
}
s.StorageConfig.StorageObjectCountTracker = storageObjectCountTracker
s.complete = true
return nil
}
// ApplyTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions).
func (s *EtcdOptions) ApplyTo(c *server.Config) error {
if s == nil {
return nil
}
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
return s.ApplyWithStorageFactoryTo(&SimpleStorageFactory{StorageConfig: s.StorageConfig}, c)
}
// ApplyWithStorageFactoryTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions).
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
if s == nil {
return nil
}
transformerOverrides := make(map[schema.GroupResource]value.Transformer)
if len(s.EncryptionProviderConfigFilepath) > 0 {
var err error
transformerOverrides, err = encryptionconfig.GetTransformerOverrides(s.EncryptionProviderConfigFilepath, c.DrainedNotify())
if err != nil {
if !s.complete {
return fmt.Errorf("EtcdOptions.Apply called without completion")
}
if !s.SkipHealthEndpoints {
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
}
// use the StorageObjectCountTracker interface instance from server.Config
s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker
c.RESTOptionsGetter = &SimpleRestOptionsFactory{
Options: *s,
TransformerOverrides: transformerOverrides,
if len(s.transformerOverrides) > 0 {
factory = &transformerStorageFactory{
delegate: factory,
transformerOverrides: s.transformerOverrides,
}
}
return nil
}
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
// use the StorageObjectCountTracker interface instance from server.Config
s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker
c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
@ -245,57 +281,11 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
return readyCheck()
}))
if s.EncryptionProviderConfigFilepath != "" {
kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath, c.DrainedNotify())
if err != nil {
return err
}
c.AddHealthChecks(kmsPluginHealthzChecks...)
}
c.AddHealthChecks(s.kmsPluginHealthzChecks...)
return nil
}
type SimpleRestOptionsFactory struct {
Options EtcdOptions
TransformerOverrides map[schema.GroupResource]value.Transformer
}
func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: f.Options.StorageConfig.ForResource(resource),
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
}
if f.TransformerOverrides != nil {
if transformer, ok := f.TransformerOverrides[resource]; ok {
ret.StorageConfig.Transformer = transformer
}
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
size, ok := sizes[resource]
if ok && size > 0 {
klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
}
if ok && size <= 0 {
klog.V(3).InfoS("Not using watch cache", "resource", resource)
ret.Decorator = generic.UndecoratedStorage
} else {
klog.V(3).InfoS("Using watch cache", "resource", resource)
ret.Decorator = genericregistry.StorageWithCacher()
}
}
return ret, nil
}
type StorageFactoryRestOptionsFactory struct {
Options EtcdOptions
StorageFactory serverstorage.StorageFactory
@ -316,6 +306,7 @@ func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
@ -326,8 +317,10 @@ func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR
klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
}
if ok && size <= 0 {
klog.V(3).InfoS("Not using watch cache", "resource", resource)
ret.Decorator = generic.UndecoratedStorage
} else {
klog.V(3).InfoS("Using watch cache", "resource", resource)
ret.Decorator = genericregistry.StorageWithCacher()
}
}
@ -369,3 +362,60 @@ func WriteWatchCacheSizes(watchCacheSizes map[schema.GroupResource]int) ([]strin
}
return cacheSizes, nil
}
var _ serverstorage.StorageFactory = &SimpleStorageFactory{}
// SimpleStorageFactory provides a StorageFactory implementation that should be used when different
// resources essentially share the same storage config (as defined by the given storagebackend.Config).
// It assumes the resources are stored at a path that is purely based on the schema.GroupResource.
// Users that need flexibility and per resource overrides should use DefaultStorageFactory instead.
type SimpleStorageFactory struct {
StorageConfig storagebackend.Config
}
func (s *SimpleStorageFactory) NewConfig(resource schema.GroupResource) (*storagebackend.ConfigForResource, error) {
return s.StorageConfig.ForResource(resource), nil
}
func (s *SimpleStorageFactory) ResourcePrefix(resource schema.GroupResource) string {
return resource.Group + "/" + resource.Resource
}
func (s *SimpleStorageFactory) Backends() []serverstorage.Backend {
// nothing should ever call this method but we still provide a functional implementation
return serverstorage.Backends(s.StorageConfig)
}
var _ serverstorage.StorageFactory = &transformerStorageFactory{}
type transformerStorageFactory struct {
delegate serverstorage.StorageFactory
transformerOverrides map[schema.GroupResource]value.Transformer
}
func (t *transformerStorageFactory) NewConfig(resource schema.GroupResource) (*storagebackend.ConfigForResource, error) {
config, err := t.delegate.NewConfig(resource)
if err != nil {
return nil, err
}
transformer, ok := t.transformerOverrides[resource]
if !ok {
return config, nil
}
configCopy := *config
resourceConfig := configCopy.Config
resourceConfig.Transformer = transformer
configCopy.Config = resourceConfig
return &configCopy, nil
}
func (t *transformerStorageFactory) ResourcePrefix(resource schema.GroupResource) string {
return t.delegate.ResourcePrefix(resource)
}
func (t *transformerStorageFactory) Backends() []serverstorage.Backend {
return t.delegate.Backends()
}

View File

@ -204,6 +204,7 @@ func TestKMSHealthzEndpoint(t *testing.T) {
name string
encryptionConfigPath string
wantChecks []string
skipHealth bool
}{
{
name: "single kms-provider, expect single kms healthz check",
@ -215,6 +216,12 @@ func TestKMSHealthzEndpoint(t *testing.T) {
encryptionConfigPath: "testdata/encryption-configs/multiple-kms-providers.yaml",
wantChecks: []string{"etcd", "kms-provider-0", "kms-provider-1"},
},
{
name: "two kms-providers with skip, expect zero kms healthz checks",
encryptionConfigPath: "testdata/encryption-configs/multiple-kms-providers.yaml",
wantChecks: nil,
skipHealth: true,
},
}
scheme := runtime.NewScheme()
@ -225,8 +232,12 @@ func TestKMSHealthzEndpoint(t *testing.T) {
serverConfig := server.NewConfig(codecs)
etcdOptions := &EtcdOptions{
EncryptionProviderConfigFilepath: tc.encryptionConfigPath,
SkipHealthEndpoints: tc.skipHealth,
}
if err := etcdOptions.addEtcdHealthEndpoint(serverConfig); err != nil {
if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify()); err != nil {
t.Fatal(err)
}
if err := etcdOptions.ApplyTo(serverConfig); err != nil {
t.Fatalf("Failed to add healthz error: %v", err)
}
@ -244,12 +255,19 @@ func TestReadinessCheck(t *testing.T) {
name string
wantReadyzChecks []string
wantHealthzChecks []string
skipHealth bool
}{
{
name: "Readyz should have etcd-readiness check",
wantReadyzChecks: []string{"etcd", "etcd-readiness"},
wantHealthzChecks: []string{"etcd"},
},
{
name: "skip health, Readyz should not have etcd-readiness check",
wantReadyzChecks: nil,
wantHealthzChecks: nil,
skipHealth: true,
},
}
scheme := runtime.NewScheme()
@ -258,8 +276,11 @@ func TestReadinessCheck(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
serverConfig := server.NewConfig(codecs)
etcdOptions := &EtcdOptions{}
if err := etcdOptions.addEtcdHealthEndpoint(serverConfig); err != nil {
etcdOptions := &EtcdOptions{SkipHealthEndpoints: tc.skipHealth}
if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify()); err != nil {
t.Fatal(err)
}
if err := etcdOptions.ApplyTo(serverConfig); err != nil {
t.Fatalf("Failed to add healthz error: %v", err)
}

View File

@ -101,6 +101,9 @@ func (o *RecommendedOptions) AddFlags(fs *pflag.FlagSet) {
// ApplyTo adds RecommendedOptions to the server configuration.
// pluginInitializers can be empty, it is only need for additional initializers.
func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
if err := o.Etcd.Complete(config.Config.StorageObjectCountTracker, config.Config.DrainedNotify()); err != nil {
return err
}
if err := o.Etcd.ApplyTo(&config.Config); err != nil {
return err
}

View File

@ -29,7 +29,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
@ -112,8 +111,6 @@ type groupResourceOverrides struct {
// decoderDecoratorFn is optional and may wrap the provided decoders (can add new decoders). The order of
// returned decoders will be priority for attempt to decode.
decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder
// transformer is optional and shall encrypt that resource at rest.
transformer value.Transformer
// disablePaging will prevent paging on the provided resource.
disablePaging bool
}
@ -139,9 +136,6 @@ func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *St
if o.decoderDecoratorFn != nil {
options.DecoderDecoratorFn = o.decoderDecoratorFn
}
if o.transformer != nil {
config.Transformer = o.transformer
}
if o.disablePaging {
config.Paging = false
}
@ -210,12 +204,6 @@ func (s *DefaultStorageFactory) SetSerializer(groupResource schema.GroupResource
s.Overrides[groupResource] = overrides
}
func (s *DefaultStorageFactory) SetTransformer(groupResource schema.GroupResource, transformer value.Transformer) {
overrides := s.Overrides[groupResource]
overrides.transformer = transformer
s.Overrides[groupResource] = overrides
}
// AddCohabitatingResources links resources together the order of the slice matters! its the priority order of lookup for finding a storage location
func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...schema.GroupResource) {
for _, groupResource := range groupResources {
@ -291,25 +279,35 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*
// Backends returns all backends for all registered storage destinations.
// Used for getting all instances for health validations.
func (s *DefaultStorageFactory) Backends() []Backend {
servers := sets.NewString(s.StorageConfig.Transport.ServerList...)
return backends(s.StorageConfig, s.Overrides)
}
for _, overrides := range s.Overrides {
// Backends returns all backends for all registered storage destinations.
// Used for getting all instances for health validations.
func Backends(storageConfig storagebackend.Config) []Backend {
return backends(storageConfig, nil)
}
func backends(storageConfig storagebackend.Config, grOverrides map[schema.GroupResource]groupResourceOverrides) []Backend {
servers := sets.NewString(storageConfig.Transport.ServerList...)
for _, overrides := range grOverrides {
servers.Insert(overrides.etcdLocation...)
}
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
if len(s.StorageConfig.Transport.CertFile) > 0 && len(s.StorageConfig.Transport.KeyFile) > 0 {
cert, err := tls.LoadX509KeyPair(s.StorageConfig.Transport.CertFile, s.StorageConfig.Transport.KeyFile)
if len(storageConfig.Transport.CertFile) > 0 && len(storageConfig.Transport.KeyFile) > 0 {
cert, err := tls.LoadX509KeyPair(storageConfig.Transport.CertFile, storageConfig.Transport.KeyFile)
if err != nil {
klog.Errorf("failed to load key pair while getting backends: %s", err)
} else {
tlsConfig.Certificates = []tls.Certificate{cert}
}
}
if len(s.StorageConfig.Transport.TrustedCAFile) > 0 {
if caCert, err := ioutil.ReadFile(s.StorageConfig.Transport.TrustedCAFile); err != nil {
if len(storageConfig.Transport.TrustedCAFile) > 0 {
if caCert, err := ioutil.ReadFile(storageConfig.Transport.TrustedCAFile); err != nil {
klog.Errorf("failed to read ca file while getting backends: %s", err)
} else {
caPool := x509.NewCertPool()

View File

@ -48,7 +48,7 @@ import (
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/value"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/metrics/legacyregistry"
tracing "k8s.io/component-base/tracing"
@ -395,7 +395,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.
}
transformer := c.Transformer
if transformer == nil {
transformer = value.IdentityTransformer
transformer = identity.NewEncryptCheckTransformer()
}
return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
}

View File

@ -48,7 +48,7 @@ import (
"k8s.io/apiserver/pkg/storage/etcd3"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
@ -107,7 +107,7 @@ func newPodList() runtime.Object { return &example.PodList{} }
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, schema.GroupResource{Resource: "pods"}, value.IdentityTransformer, true, etcd3.NewDefaultLeaseManagerConfig())
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
return server, storage
}

View File

@ -24,6 +24,12 @@ import (
"k8s.io/apiserver/pkg/storage/value"
)
var (
transformer = identityTransformer{}
encryptedPrefix = []byte("k8s:enc:")
errEncryptedData = fmt.Errorf("identity transformer tried to read encrypted data")
)
// identityTransformer performs no transformation on provided data, but validates
// that the data is not encrypted data during TransformFromStorage
type identityTransformer struct{}
@ -31,7 +37,7 @@ type identityTransformer struct{}
// NewEncryptCheckTransformer returns an identityTransformer which returns an error
// on attempts to read encrypted data
func NewEncryptCheckTransformer() value.Transformer {
return identityTransformer{}
return transformer
}
// TransformFromStorage returns the input bytes if the data is not encrypted
@ -39,8 +45,8 @@ func (identityTransformer) TransformFromStorage(ctx context.Context, data []byte
// identityTransformer has to return an error if the data is encoded using another transformer.
// JSON data starts with '{'. Protobuf data has a prefix 'k8s[\x00-\xFF]'.
// Prefix 'k8s:enc:' is reserved for encrypted data on disk.
if bytes.HasPrefix(data, []byte("k8s:enc:")) {
return []byte{}, false, fmt.Errorf("identity transformer tried to read encrypted data")
if bytes.HasPrefix(data, encryptedPrefix) {
return nil, false, errEncryptedData
}
return data, false, nil
}

View File

@ -21,7 +21,6 @@ import (
"bytes"
"context"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/errors"
@ -51,54 +50,11 @@ type Transformer interface {
TransformToStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, err error)
}
type identityTransformer struct{}
// IdentityTransformer performs no transformation of the provided data.
var IdentityTransformer Transformer = identityTransformer{}
func (identityTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx Context) ([]byte, bool, error) {
return data, false, nil
}
func (identityTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx Context) ([]byte, error) {
return data, nil
}
// DefaultContext is a simple implementation of Context for a slice of bytes.
type DefaultContext []byte
// AuthenticatedData returns itself.
func (c DefaultContext) AuthenticatedData() []byte { return []byte(c) }
// MutableTransformer allows a transformer to be changed safely at runtime.
type MutableTransformer struct {
lock sync.RWMutex
transformer Transformer
}
// NewMutableTransformer creates a transformer that can be updated at any time by calling Set()
func NewMutableTransformer(transformer Transformer) *MutableTransformer {
return &MutableTransformer{transformer: transformer}
}
// Set updates the nested transformer.
func (t *MutableTransformer) Set(transformer Transformer) {
t.lock.Lock()
t.transformer = transformer
t.lock.Unlock()
}
func (t *MutableTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, stale bool, err error) {
t.lock.RLock()
transformer := t.transformer
t.lock.RUnlock()
return transformer.TransformFromStorage(ctx, data, dataCtx)
}
func (t *MutableTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, err error) {
t.lock.RLock()
transformer := t.transformer
t.lock.RUnlock()
return transformer.TransformToStorage(ctx, data, dataCtx)
}
func (c DefaultContext) AuthenticatedData() []byte { return c }
// PrefixTransformer holds a transformer interface and the prefix that the transformation is located under.
type PrefixTransformer struct {

View File

@ -29,17 +29,25 @@ import (
"time"
"github.com/gogo/protobuf/proto"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
"k8s.io/apiserver/pkg/storage/value"
aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2"
kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2alpha1"
kmsv2mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v2alpha1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
featuregatetesting "k8s.io/component-base/featuregate/testing"
kmsv2api "k8s.io/kms/apis/v2alpha1"
"k8s.io/kubernetes/test/integration/etcd"
)
type envelopekmsv2 struct {
@ -273,3 +281,81 @@ resources:
mustBeHealthy(t, "kms-provider-0", test.kubeAPIServer.ClientConfig)
mustBeUnHealthy(t, "kms-provider-1", test.kubeAPIServer.ClientConfig)
}
func TestKMSv2SingleService(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
var kmsv2Calls int
origEnvelopeKMSv2ServiceFactory := encryptionconfig.EnvelopeKMSv2ServiceFactory
encryptionconfig.EnvelopeKMSv2ServiceFactory = func(ctx context.Context, endpoint string, callTimeout time.Duration) (kmsv2.Service, error) {
kmsv2Calls++
return origEnvelopeKMSv2ServiceFactory(ctx, endpoint, callTimeout)
}
t.Cleanup(func() {
encryptionconfig.EnvelopeKMSv2ServiceFactory = origEnvelopeKMSv2ServiceFactory
})
// check resources provided by the three servers that we have wired together
// - pods and config maps from KAS
// - CRDs and CRs from API extensions
// - API services from aggregator
encryptionConfig := `
kind: EncryptionConfiguration
apiVersion: apiserver.config.k8s.io/v1
resources:
- resources:
- pods
- configmaps
- customresourcedefinitions.apiextensions.k8s.io
- pandas.awesome.bears.com
- apiservices.apiregistration.k8s.io
providers:
- kms:
apiVersion: v2
name: kms-provider
cachesize: 1000
endpoint: unix:///@kms-provider.sock
`
pluginMock, err := kmsv2mock.NewBase64Plugin("@kms-provider.sock")
if err != nil {
t.Fatalf("failed to create mock of KMSv2 Plugin: %v", err)
}
go pluginMock.Start()
if err := kmsv2mock.WaitForBase64PluginToBeUp(pluginMock); err != nil {
t.Fatalf("Failed start plugin, err: %v", err)
}
t.Cleanup(pluginMock.CleanUp)
test, err := newTransformTest(t, encryptionConfig)
if err != nil {
t.Fatalf("failed to start KUBE API Server with encryptionConfig\n %s, error: %v", encryptionConfig, err)
}
t.Cleanup(test.cleanUp)
// the storage registry for CRs is dynamic so create one to exercise the wiring
etcd.CreateTestCRDs(t, apiextensionsclientset.NewForConfigOrDie(test.kubeAPIServer.ClientConfig), false, etcd.GetCustomResourceDefinitionData()...)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
t.Cleanup(cancel)
gvr := schema.GroupVersionResource{Group: "awesome.bears.com", Version: "v1", Resource: "pandas"}
stub := etcd.GetEtcdStorageData()[gvr].Stub
dynamicClient, obj, err := etcd.JSONToUnstructured(stub, "", &meta.RESTMapping{
Resource: gvr,
GroupVersionKind: gvr.GroupVersion().WithKind("Panda"),
Scope: meta.RESTScopeRoot,
}, dynamic.NewForConfigOrDie(test.kubeAPIServer.ClientConfig))
if err != nil {
t.Fatal(err)
}
_, err = dynamicClient.Create(ctx, obj, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
if kmsv2Calls != 1 {
t.Fatalf("expected a single call to KMS v2 service factory: %v", kmsv2Calls)
}
}