mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
Merge pull request #114370 from enj/enj/r/reload_nits
encryption-at-rest: clean up context usage and duplicated code
This commit is contained in:
commit
c9ed04762f
@ -36,7 +36,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
apiserverconfig "k8s.io/apiserver/pkg/apis/config"
|
||||
apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1"
|
||||
"k8s.io/apiserver/pkg/apis/config/validation"
|
||||
@ -61,7 +60,13 @@ const (
|
||||
kmsPluginHealthzPositiveTTL = 20 * time.Second
|
||||
kmsAPIVersionV1 = "v1"
|
||||
kmsAPIVersionV2 = "v2"
|
||||
kmsReloadHealthCheckName = "kms-providers"
|
||||
// this name is used for two different healthz endpoints:
|
||||
// - when one or more KMS v2 plugins are in use and no KMS v1 plugins are in use
|
||||
// in this case, all v2 plugins are probed via this single endpoint
|
||||
// - when automatic reload of encryption config is enabled
|
||||
// in this case, all KMS plugins are probed via this single endpoint
|
||||
// the endpoint is present even if there are no KMS plugins configured (it is a no-op then)
|
||||
kmsReloadHealthCheckName = "kms-providers"
|
||||
)
|
||||
|
||||
type kmsPluginHealthzResponse struct {
|
||||
@ -133,15 +138,16 @@ type EncryptionConfiguration struct {
|
||||
}
|
||||
|
||||
// LoadEncryptionConfig parses and validates the encryption config specified by filepath.
|
||||
// It may launch multiple go routines whose lifecycle is controlled by stopCh.
|
||||
// It may launch multiple go routines whose lifecycle is controlled by ctx.
|
||||
// In case of an error, the caller is responsible for canceling ctx to clean up any go routines that may have been launched.
|
||||
// If reload is true, or KMS v2 plugins are used with no KMS v1 plugins, the returned slice of health checkers will always be of length 1.
|
||||
func LoadEncryptionConfig(filepath string, reload bool, stopCh <-chan struct{}) (*EncryptionConfiguration, error) {
|
||||
func LoadEncryptionConfig(ctx context.Context, filepath string, reload bool) (*EncryptionConfiguration, error) {
|
||||
config, contentHash, err := loadConfig(filepath, reload)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error while parsing file: %w", err)
|
||||
}
|
||||
|
||||
transformers, kmsHealthChecks, kmsUsed, err := getTransformerOverridesAndKMSPluginHealthzCheckers(config, stopCh)
|
||||
transformers, kmsHealthChecks, kmsUsed, err := getTransformerOverridesAndKMSPluginHealthzCheckers(ctx, config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error while building transformers: %w", err)
|
||||
}
|
||||
@ -160,12 +166,15 @@ func LoadEncryptionConfig(filepath string, reload bool, stopCh <-chan struct{})
|
||||
HealthChecks: kmsHealthChecks,
|
||||
EncryptionFileContentHash: contentHash,
|
||||
KMSCloseGracePeriod: 2 * kmsUsed.kmsTimeoutSum,
|
||||
}, err
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getTransformerOverridesAndKMSPluginHealthzCheckers(config *apiserverconfig.EncryptionConfiguration, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, []healthz.HealthChecker, *kmsState, error) {
|
||||
// getTransformerOverridesAndKMSPluginHealthzCheckers creates the set of transformers and KMS healthz checks based on the given config.
|
||||
// It may launch multiple go routines whose lifecycle is controlled by ctx.
|
||||
// In case of an error, the caller is responsible for canceling ctx to clean up any go routines that may have been launched.
|
||||
func getTransformerOverridesAndKMSPluginHealthzCheckers(ctx context.Context, config *apiserverconfig.EncryptionConfiguration) (map[schema.GroupResource]value.Transformer, []healthz.HealthChecker, *kmsState, error) {
|
||||
var kmsHealthChecks []healthz.HealthChecker
|
||||
transformers, probes, kmsUsed, err := getTransformerOverridesAndKMSPluginProbes(config, stopCh)
|
||||
transformers, probes, kmsUsed, err := getTransformerOverridesAndKMSPluginProbes(ctx, config)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
@ -181,7 +190,10 @@ type healthChecker interface {
|
||||
toHealthzCheck(idx int) healthz.HealthChecker
|
||||
}
|
||||
|
||||
func getTransformerOverridesAndKMSPluginProbes(config *apiserverconfig.EncryptionConfiguration, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, []healthChecker, *kmsState, error) {
|
||||
// getTransformerOverridesAndKMSPluginProbes creates the set of transformers and KMS probes based on the given config.
|
||||
// It may launch multiple go routines whose lifecycle is controlled by ctx.
|
||||
// In case of an error, the caller is responsible for canceling ctx to clean up any go routines that may have been launched.
|
||||
func getTransformerOverridesAndKMSPluginProbes(ctx context.Context, config *apiserverconfig.EncryptionConfiguration) (map[schema.GroupResource]value.Transformer, []healthChecker, *kmsState, error) {
|
||||
resourceToPrefixTransformer := map[schema.GroupResource][]value.PrefixTransformer{}
|
||||
var probes []healthChecker
|
||||
var kmsUsed kmsState
|
||||
@ -190,14 +202,11 @@ func getTransformerOverridesAndKMSPluginProbes(config *apiserverconfig.Encryptio
|
||||
for _, resourceConfig := range config.Resources {
|
||||
resourceConfig := resourceConfig
|
||||
|
||||
transformers, p, used, err := prefixTransformersAndProbes(resourceConfig, stopCh)
|
||||
transformers, p, used, err := prefixTransformersAndProbes(ctx, resourceConfig)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
kmsUsed.v1Used = kmsUsed.v1Used || used.v1Used
|
||||
kmsUsed.v2Used = kmsUsed.v2Used || used.v2Used
|
||||
|
||||
kmsUsed.kmsTimeoutSum += used.kmsTimeoutSum
|
||||
kmsUsed.accumulate(used)
|
||||
|
||||
// For each resource, create a list of providers to use
|
||||
for _, resource := range resourceConfig.Resources {
|
||||
@ -326,7 +335,10 @@ func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfig
|
||||
return config, computeEncryptionConfigHash(data), validation.ValidateEncryptionConfiguration(config, reload).ToAggregate()
|
||||
}
|
||||
|
||||
func prefixTransformersAndProbes(config apiserverconfig.ResourceConfiguration, stopCh <-chan struct{}) ([]value.PrefixTransformer, []healthChecker, *kmsState, error) {
|
||||
// prefixTransformersAndProbes creates the set of transformers and KMS probes based on the given resource config.
|
||||
// It may launch multiple go routines whose lifecycle is controlled by ctx.
|
||||
// In case of an error, the caller is responsible for canceling ctx to clean up any go routines that may have been launched.
|
||||
func prefixTransformersAndProbes(ctx context.Context, config apiserverconfig.ResourceConfiguration) ([]value.PrefixTransformer, []healthChecker, *kmsState, error) {
|
||||
var transformers []value.PrefixTransformer
|
||||
var probes []healthChecker
|
||||
var kmsUsed kmsState
|
||||
@ -351,14 +363,10 @@ func prefixTransformersAndProbes(config apiserverconfig.ResourceConfiguration, s
|
||||
transformer, transformerErr = secretboxPrefixTransformer(provider.Secretbox)
|
||||
|
||||
case provider.KMS != nil:
|
||||
transformer, probe, used, transformerErr = kmsPrefixTransformer(provider.KMS, stopCh)
|
||||
transformer, probe, used, transformerErr = kmsPrefixTransformer(ctx, provider.KMS)
|
||||
if transformerErr == nil {
|
||||
probes = append(probes, probe)
|
||||
kmsUsed.v1Used = kmsUsed.v1Used || used.v1Used
|
||||
kmsUsed.v2Used = kmsUsed.v2Used || used.v2Used
|
||||
|
||||
// calculate the maximum timeout for all KMS providers
|
||||
kmsUsed.kmsTimeoutSum += used.kmsTimeoutSum
|
||||
kmsUsed.accumulate(used)
|
||||
}
|
||||
|
||||
case provider.Identity != nil:
|
||||
@ -497,10 +505,20 @@ type kmsState struct {
|
||||
kmsTimeoutSum time.Duration
|
||||
}
|
||||
|
||||
func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-chan struct{}) (value.PrefixTransformer, healthChecker, *kmsState, error) {
|
||||
// we ignore the cancel func because this context should only be canceled when stopCh is closed
|
||||
ctx, _ := wait.ContextForChannel(stopCh)
|
||||
// accumulate computes the KMS state by:
|
||||
// - determining which KMS plugin versions are in use
|
||||
// - calculating kmsTimeoutSum which is used as transformTracker.kmsCloseGracePeriod
|
||||
// DynamicTransformers.Set waits for this period before closing old transformers after a config reload
|
||||
func (s *kmsState) accumulate(other *kmsState) {
|
||||
s.v1Used = s.v1Used || other.v1Used
|
||||
s.v2Used = s.v2Used || other.v2Used
|
||||
s.kmsTimeoutSum += other.kmsTimeoutSum
|
||||
}
|
||||
|
||||
// kmsPrefixTransformer creates a KMS transformer and probe based on the given KMS config.
|
||||
// It may launch multiple go routines whose lifecycle is controlled by ctx.
|
||||
// In case of an error, the caller is responsible for canceling ctx to clean up any go routines that may have been launched.
|
||||
func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfiguration) (value.PrefixTransformer, healthChecker, *kmsState, error) {
|
||||
kmsName := config.Name
|
||||
switch config.APIVersion {
|
||||
case kmsAPIVersionV1:
|
||||
@ -606,6 +624,7 @@ func computeEncryptionConfigHash(data []byte) string {
|
||||
return fmt.Sprintf("%x", sha256.Sum256(data))
|
||||
}
|
||||
|
||||
var _ ResourceTransformers = &DynamicTransformers{}
|
||||
var _ healthz.HealthChecker = &DynamicTransformers{}
|
||||
|
||||
// DynamicTransformers holds transformers that may be dynamically updated via a single external actor, likely a controller.
|
||||
@ -704,25 +723,23 @@ func (r *resourceTransformer) TransformToStorage(ctx context.Context, data []byt
|
||||
}
|
||||
|
||||
func (r *resourceTransformer) transformer() value.Transformer {
|
||||
transformer := r.transformTracker.Load().(*transformTracker).transformerOverrides[r.resource]
|
||||
if transformer == nil {
|
||||
return identity.NewEncryptCheckTransformer()
|
||||
}
|
||||
return transformer
|
||||
return transformerFromOverrides(r.transformTracker.Load().(*transformTracker).transformerOverrides, r.resource)
|
||||
}
|
||||
|
||||
type ResourceTransformers interface {
|
||||
TransformerForResource(resource schema.GroupResource) value.Transformer
|
||||
}
|
||||
|
||||
var _ ResourceTransformers = &DynamicTransformers{}
|
||||
var _ ResourceTransformers = &StaticTransformers{}
|
||||
|
||||
type StaticTransformers map[schema.GroupResource]value.Transformer
|
||||
|
||||
// StaticTransformers
|
||||
func (s StaticTransformers) TransformerForResource(resource schema.GroupResource) value.Transformer {
|
||||
transformer := s[resource]
|
||||
return transformerFromOverrides(s, resource)
|
||||
}
|
||||
|
||||
func transformerFromOverrides(transformerOverrides map[schema.GroupResource]value.Transformer, resource schema.GroupResource) value.Transformer {
|
||||
transformer := transformerOverrides[resource]
|
||||
if transformer == nil {
|
||||
return identity.NewEncryptCheckTransformer()
|
||||
}
|
||||
|
@ -177,37 +177,37 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
|
||||
// Transforms data using one of them, and tries to untransform using the others.
|
||||
// Repeats this for all possible combinations.
|
||||
correctConfigWithIdentityFirst := "testdata/valid-configs/identity-first.yaml"
|
||||
identityFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithIdentityFirst, false, ctx.Done())
|
||||
identityFirstEncryptionConfiguration, err := LoadEncryptionConfig(ctx, correctConfigWithIdentityFirst, false)
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithIdentityFirst)
|
||||
}
|
||||
|
||||
correctConfigWithAesGcmFirst := "testdata/valid-configs/aes-gcm-first.yaml"
|
||||
aesGcmFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithAesGcmFirst, false, ctx.Done())
|
||||
aesGcmFirstEncryptionConfiguration, err := LoadEncryptionConfig(ctx, correctConfigWithAesGcmFirst, false)
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesGcmFirst)
|
||||
}
|
||||
|
||||
correctConfigWithAesCbcFirst := "testdata/valid-configs/aes-cbc-first.yaml"
|
||||
aesCbcFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithAesCbcFirst, false, ctx.Done())
|
||||
aesCbcFirstEncryptionConfiguration, err := LoadEncryptionConfig(ctx, correctConfigWithAesCbcFirst, false)
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesCbcFirst)
|
||||
}
|
||||
|
||||
correctConfigWithSecretboxFirst := "testdata/valid-configs/secret-box-first.yaml"
|
||||
secretboxFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithSecretboxFirst, false, ctx.Done())
|
||||
secretboxFirstEncryptionConfiguration, err := LoadEncryptionConfig(ctx, correctConfigWithSecretboxFirst, false)
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithSecretboxFirst)
|
||||
}
|
||||
|
||||
correctConfigWithKMSFirst := "testdata/valid-configs/kms-first.yaml"
|
||||
kmsFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithKMSFirst, false, ctx.Done())
|
||||
kmsFirstEncryptionConfiguration, err := LoadEncryptionConfig(ctx, correctConfigWithKMSFirst, false)
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSFirst)
|
||||
}
|
||||
|
||||
correctConfigWithKMSv2First := "testdata/valid-configs/kmsv2-first.yaml"
|
||||
kmsv2FirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithKMSv2First, false, ctx.Done())
|
||||
kmsv2FirstEncryptionConfiguration, err := LoadEncryptionConfig(ctx, correctConfigWithKMSv2First, false)
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSv2First)
|
||||
}
|
||||
@ -460,7 +460,7 @@ func TestKMSMaxTimeout(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
_, _, kmsUsed, _ := getTransformerOverridesAndKMSPluginHealthzCheckers(&testCase.config, testContext(t).Done())
|
||||
_, _, kmsUsed, _ := getTransformerOverridesAndKMSPluginHealthzCheckers(testContext(t), &testCase.config)
|
||||
if kmsUsed == nil {
|
||||
t.Fatal("kmsUsed should not be nil")
|
||||
}
|
||||
@ -547,7 +547,7 @@ func TestKMSPluginHealthz(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
_, got, kmsUsed, err := getTransformerOverridesAndKMSPluginProbes(config, testContext(t).Done())
|
||||
_, got, kmsUsed, err := getTransformerOverridesAndKMSPluginProbes(testContext(t), config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -757,7 +757,7 @@ func getTransformerFromEncryptionConfig(t *testing.T, encryptionConfigPath strin
|
||||
ctx := testContext(t)
|
||||
|
||||
t.Helper()
|
||||
encryptionConfiguration, err := LoadEncryptionConfig(encryptionConfigPath, false, ctx.Done())
|
||||
encryptionConfiguration, err := LoadEncryptionConfig(ctx, encryptionConfigPath, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -49,27 +49,22 @@ type DynamicKMSEncryptionConfigContent struct {
|
||||
|
||||
// dynamicTransformers updates the transformers when encryption config file changes.
|
||||
dynamicTransformers *encryptionconfig.DynamicTransformers
|
||||
|
||||
// stopCh used here is a lifecycle signal of genericapiserver already drained while shutting down.
|
||||
stopCh <-chan struct{}
|
||||
}
|
||||
|
||||
// NewDynamicKMSEncryptionConfiguration returns controller that dynamically reacts to changes in encryption config file.
|
||||
func NewDynamicKMSEncryptionConfiguration(
|
||||
// NewDynamicEncryptionConfiguration returns controller that dynamically reacts to changes in encryption config file.
|
||||
func NewDynamicEncryptionConfiguration(
|
||||
name, filePath string,
|
||||
dynamicTransformers *encryptionconfig.DynamicTransformers,
|
||||
configContentHash string,
|
||||
stopCh <-chan struct{},
|
||||
) *DynamicKMSEncryptionConfigContent {
|
||||
encryptionConfig := &DynamicKMSEncryptionConfigContent{
|
||||
name: name,
|
||||
filePath: filePath,
|
||||
lastLoadedEncryptionConfigHash: configContentHash,
|
||||
dynamicTransformers: dynamicTransformers,
|
||||
stopCh: stopCh,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s-hot-reload", name)),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
|
||||
}
|
||||
encryptionConfig.queue.Add(workqueueKey)
|
||||
encryptionConfig.queue.Add(workqueueKey) // to avoid missing any file changes that occur in between the initial load and Run
|
||||
|
||||
return encryptionConfig
|
||||
}
|
||||
@ -83,21 +78,21 @@ func (d *DynamicKMSEncryptionConfigContent) Run(ctx context.Context) {
|
||||
defer klog.InfoS("Shutting down controller", "name", d.name)
|
||||
|
||||
// start worker for processing content
|
||||
go wait.Until(d.runWorker, time.Second, ctx.Done())
|
||||
go wait.UntilWithContext(ctx, d.runWorker, time.Second)
|
||||
|
||||
// start the loop that watches the encryption config file until stopCh is closed.
|
||||
go wait.Until(func() {
|
||||
if err := d.watchEncryptionConfigFile(ctx.Done()); err != nil {
|
||||
go wait.UntilWithContext(ctx, func(ctx context.Context) {
|
||||
if err := d.watchEncryptionConfigFile(ctx); err != nil {
|
||||
// if there is an error while setting up or handling the watches, this will ensure that we will process the config file.
|
||||
defer d.queue.Add(workqueueKey)
|
||||
klog.ErrorS(err, "Failed to watch encryption config file, will retry later")
|
||||
}
|
||||
}, time.Second, ctx.Done())
|
||||
}, time.Second)
|
||||
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (d *DynamicKMSEncryptionConfigContent) watchEncryptionConfigFile(stopCh <-chan struct{}) error {
|
||||
func (d *DynamicKMSEncryptionConfigContent) watchEncryptionConfigFile(ctx context.Context) error {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating fsnotify watcher: %w", err)
|
||||
@ -116,7 +111,7 @@ func (d *DynamicKMSEncryptionConfigContent) watchEncryptionConfigFile(stopCh <-c
|
||||
}
|
||||
case err := <-watcher.Errors:
|
||||
return fmt.Errorf("received fsnotify error: %w", err)
|
||||
case <-stopCh:
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -142,13 +137,13 @@ func (d *DynamicKMSEncryptionConfigContent) handleWatchEvent(event fsnotify.Even
|
||||
}
|
||||
|
||||
// runWorker to process file content
|
||||
func (d *DynamicKMSEncryptionConfigContent) runWorker() {
|
||||
for d.processNextWorkItem() {
|
||||
func (d *DynamicKMSEncryptionConfigContent) runWorker(ctx context.Context) {
|
||||
for d.processNextWorkItem(ctx) {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem processes file content when there is a message in the queue.
|
||||
func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem() bool {
|
||||
func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem(serverCtx context.Context) bool {
|
||||
// key here is dummy item in the queue to trigger file content processing.
|
||||
key, quit := d.queue.Get()
|
||||
if quit {
|
||||
@ -163,12 +158,15 @@ func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem() bool {
|
||||
configChanged bool
|
||||
)
|
||||
|
||||
// get context to close the new transformers.
|
||||
ctx, closeTransformers := wait.ContextForChannel(d.stopCh)
|
||||
// get context to close the new transformers (on error cases and on the next reload)
|
||||
// serverCtx is attached to the API server's lifecycle so we will always close transformers on shut down
|
||||
ctx, closeTransformers := context.WithCancel(serverCtx)
|
||||
|
||||
defer func() {
|
||||
// TODO: increment success metric when updatedEffectiveConfig=true
|
||||
|
||||
// TODO can work queue metrics help here?
|
||||
|
||||
if !updatedEffectiveConfig {
|
||||
// avoid leaking if we're not using the newly constructed transformers (due to an error or them not being changed)
|
||||
closeTransformers()
|
||||
@ -222,7 +220,7 @@ func (d *DynamicKMSEncryptionConfigContent) processEncryptionConfig(ctx context.
|
||||
err error,
|
||||
) {
|
||||
// this code path will only execute if reload=true. So passing true explicitly.
|
||||
encryptionConfiguration, err = encryptionconfig.LoadEncryptionConfig(d.filePath, true, ctx.Done())
|
||||
encryptionConfiguration, err = encryptionconfig.LoadEncryptionConfig(ctx, d.filePath, true)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
@ -247,7 +245,12 @@ func (d *DynamicKMSEncryptionConfigContent) validateNewTransformersHealth(
|
||||
kmsPluginCloseGracePeriod = 10 * time.Second
|
||||
}
|
||||
|
||||
pollErr := wait.PollImmediate(100*time.Millisecond, kmsPluginCloseGracePeriod, func() (bool, error) {
|
||||
// really make sure that the immediate check does not hang
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, kmsPluginCloseGracePeriod)
|
||||
defer cancel()
|
||||
|
||||
pollErr := wait.PollImmediateWithContext(ctx, 100*time.Millisecond, kmsPluginCloseGracePeriod, func(ctx context.Context) (bool, error) {
|
||||
// create a fake http get request to health check endpoint
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("/healthz/%s", kmsPluginHealthzCheck.Name()), nil)
|
||||
if err != nil {
|
||||
|
@ -41,12 +41,11 @@ func TestProcessEncryptionConfig(t *testing.T) {
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
d := NewDynamicKMSEncryptionConfiguration(
|
||||
d := NewDynamicEncryptionConfiguration(
|
||||
testCase.name,
|
||||
testCase.filePath,
|
||||
nil,
|
||||
"",
|
||||
ctx.Done(),
|
||||
)
|
||||
|
||||
_, _, err := d.processEncryptionConfig(ctx)
|
||||
@ -103,19 +102,19 @@ func TestWatchEncryptionConfigFile(t *testing.T) {
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
testFilePath := copyFileForTest(t, "testdata/ec_config.yaml")
|
||||
|
||||
d := NewDynamicKMSEncryptionConfiguration(
|
||||
d := NewDynamicEncryptionConfiguration(
|
||||
testCase.name,
|
||||
testFilePath,
|
||||
nil,
|
||||
"",
|
||||
ctx.Done(),
|
||||
)
|
||||
|
||||
errs := make(chan error, 1)
|
||||
go func() {
|
||||
err := d.watchEncryptionConfigFile(d.stopCh)
|
||||
err := d.watchEncryptionConfigFile(ctx)
|
||||
errs <- err
|
||||
}()
|
||||
|
||||
|
@ -33,7 +33,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
|
||||
kmsconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
|
||||
encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
@ -231,7 +231,7 @@ func (s *EtcdOptions) Complete(
|
||||
ctxTransformers, closeTransformers := wait.ContextForChannel(stopCh)
|
||||
ctxServer, _ := wait.ContextForChannel(stopCh) // explicitly ignore cancel here because we do not own the server's lifecycle
|
||||
|
||||
encryptionConfiguration, err := encryptionconfig.LoadEncryptionConfig(s.EncryptionProviderConfigFilepath, s.EncryptionProviderConfigAutomaticReload, ctxTransformers.Done())
|
||||
encryptionConfiguration, err := encryptionconfig.LoadEncryptionConfig(ctxTransformers, s.EncryptionProviderConfigFilepath, s.EncryptionProviderConfigAutomaticReload)
|
||||
if err != nil {
|
||||
// in case of error, we want to close partially initialized (if any) transformers
|
||||
closeTransformers()
|
||||
@ -249,23 +249,19 @@ func (s *EtcdOptions) Complete(
|
||||
|
||||
dynamicTransformers := encryptionconfig.NewDynamicTransformers(encryptionConfiguration.Transformers, encryptionConfiguration.HealthChecks[0], closeTransformers, encryptionConfiguration.KMSCloseGracePeriod)
|
||||
|
||||
s.resourceTransformers = dynamicTransformers
|
||||
s.kmsPluginHealthzChecks = []healthz.HealthChecker{dynamicTransformers}
|
||||
|
||||
// add post start hook to start hot reload controller
|
||||
// adding this hook here will ensure that it gets configured exactly once
|
||||
err = addPostStartHook(
|
||||
"start-encryption-provider-config-automatic-reload",
|
||||
func(hookContext server.PostStartHookContext) error {
|
||||
kmsConfigController := kmsconfigcontroller.NewDynamicKMSEncryptionConfiguration(
|
||||
"kms-encryption-config",
|
||||
func(_ server.PostStartHookContext) error {
|
||||
dynamicEncryptionConfigController := encryptionconfigcontroller.NewDynamicEncryptionConfiguration(
|
||||
"encryption-provider-config-automatic-reload-controller",
|
||||
s.EncryptionProviderConfigFilepath,
|
||||
dynamicTransformers,
|
||||
encryptionConfiguration.EncryptionFileContentHash,
|
||||
ctxServer.Done(),
|
||||
)
|
||||
|
||||
go kmsConfigController.Run(ctxServer)
|
||||
go dynamicEncryptionConfigController.Run(ctxServer)
|
||||
|
||||
return nil
|
||||
},
|
||||
@ -275,6 +271,9 @@ func (s *EtcdOptions) Complete(
|
||||
closeTransformers()
|
||||
return fmt.Errorf("failed to add post start hook for kms encryption config hot reload controller: %w", err)
|
||||
}
|
||||
|
||||
s.resourceTransformers = dynamicTransformers
|
||||
s.kmsPluginHealthzChecks = []healthz.HealthChecker{dynamicTransformers}
|
||||
} else {
|
||||
s.resourceTransformers = encryptionconfig.StaticTransformers(encryptionConfiguration.Transformers)
|
||||
s.kmsPluginHealthzChecks = encryptionConfiguration.HealthChecks
|
||||
|
Loading…
Reference in New Issue
Block a user