mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
feat: updates encryption config file watch logic to polling
Signed-off-by: Nilekh Chaudhari <1626598+nilekhc@users.noreply.github.com> fix (#2) Signed-off-by: Monis Khan <mok@microsoft.com>
This commit is contained in:
parent
095786913d
commit
e95b7c6d8b
@ -504,6 +504,24 @@ func (h *kmsv2PluginProbe) isKMSv2ProviderHealthyAndMaybeRotateDEK(ctx context.C
|
||||
|
||||
// loadConfig parses the encryption configuration file at filepath and returns the parsed config and hash of the file.
|
||||
func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfiguration, string, error) {
|
||||
data, contentHash, err := loadDataAndHash(filepath)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("error while loading file: %w", err)
|
||||
}
|
||||
|
||||
configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("error decoding encryption provider configuration file %q: %w", filepath, err)
|
||||
}
|
||||
config, ok := configObj.(*apiserverconfig.EncryptionConfiguration)
|
||||
if !ok {
|
||||
return nil, "", fmt.Errorf("got unexpected config type: %v", gvk)
|
||||
}
|
||||
|
||||
return config, contentHash, validation.ValidateEncryptionConfiguration(config, reload).ToAggregate()
|
||||
}
|
||||
|
||||
func loadDataAndHash(filepath string) ([]byte, string, error) {
|
||||
f, err := os.Open(filepath)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("error opening encryption provider configuration file %q: %w", filepath, err)
|
||||
@ -518,16 +536,14 @@ func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfig
|
||||
return nil, "", fmt.Errorf("encryption provider configuration file %q is empty", filepath)
|
||||
}
|
||||
|
||||
configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("error decoding encryption provider configuration file %q: %w", filepath, err)
|
||||
}
|
||||
config, ok := configObj.(*apiserverconfig.EncryptionConfiguration)
|
||||
if !ok {
|
||||
return nil, "", fmt.Errorf("got unexpected config type: %v", gvk)
|
||||
}
|
||||
return data, computeEncryptionConfigHash(data), nil
|
||||
}
|
||||
|
||||
return config, computeEncryptionConfigHash(data), validation.ValidateEncryptionConfiguration(config, reload).ToAggregate()
|
||||
// GetEncryptionConfigHash reads the encryption configuration file at filepath and returns the hash of the file.
|
||||
// It does not attempt to decode or load the config, and serves as a cheap check to determine if the file has changed.
|
||||
func GetEncryptionConfigHash(filepath string) (string, error) {
|
||||
_, contentHash, err := loadDataAndHash(filepath)
|
||||
return contentHash, err
|
||||
}
|
||||
|
||||
// prefixTransformersAndProbes creates the set of transformers and KMS probes based on the given resource config.
|
||||
|
@ -2177,3 +2177,48 @@ func logLines(logs string) []string {
|
||||
}
|
||||
return lines
|
||||
}
|
||||
|
||||
func TestGetEncryptionConfigHash(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
filepath string
|
||||
wantHash string
|
||||
wantErr string
|
||||
}{
|
||||
{
|
||||
name: "empty config file content",
|
||||
filepath: "testdata/invalid-configs/kms/invalid-content.yaml",
|
||||
wantHash: "",
|
||||
wantErr: `encryption provider configuration file "testdata/invalid-configs/kms/invalid-content.yaml" is empty`,
|
||||
},
|
||||
{
|
||||
name: "missing file",
|
||||
filepath: "testdata/invalid-configs/kms/file-that-does-not-exist.yaml",
|
||||
wantHash: "",
|
||||
wantErr: `error opening encryption provider configuration file "testdata/invalid-configs/kms/file-that-does-not-exist.yaml": open testdata/invalid-configs/kms/file-that-does-not-exist.yaml: no such file or directory`,
|
||||
},
|
||||
{
|
||||
name: "valid file",
|
||||
filepath: "testdata/valid-configs/secret-box-first.yaml",
|
||||
wantHash: "c638c0327dbc3276dd1fcf3e67895d19ebca16b91ae0d19af24ef0759b8e0f66",
|
||||
wantErr: ``,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
got, err := GetEncryptionConfigHash(tt.filepath)
|
||||
if errString(err) != tt.wantErr {
|
||||
t.Errorf("GetEncryptionConfigHash() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if got != tt.wantHash {
|
||||
t.Errorf("GetEncryptionConfigHash() got = %v, want %v", got, tt.wantHash)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -20,9 +20,9 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
@ -35,6 +35,9 @@ import (
|
||||
// workqueueKey is the dummy key used to process change in encryption config file.
|
||||
const workqueueKey = "key"
|
||||
|
||||
// EncryptionConfigFileChangePollDuration is exposed so that integration tests can crank up the reload speed.
|
||||
var EncryptionConfigFileChangePollDuration = time.Minute
|
||||
|
||||
// DynamicKMSEncryptionConfigContent which can dynamically handle changes in encryption config file.
|
||||
type DynamicKMSEncryptionConfigContent struct {
|
||||
name string
|
||||
@ -53,6 +56,10 @@ type DynamicKMSEncryptionConfigContent struct {
|
||||
|
||||
// identity of the api server
|
||||
apiServerID string
|
||||
|
||||
// can be swapped during testing
|
||||
getEncryptionConfigHash func(ctx context.Context, filepath string) (string, error)
|
||||
loadEncryptionConfig func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error)
|
||||
}
|
||||
|
||||
func init() {
|
||||
@ -73,77 +80,57 @@ func NewDynamicEncryptionConfiguration(
|
||||
dynamicTransformers: dynamicTransformers,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
|
||||
apiServerID: apiServerID,
|
||||
getEncryptionConfigHash: func(_ context.Context, filepath string) (string, error) {
|
||||
return encryptionconfig.GetEncryptionConfigHash(filepath)
|
||||
},
|
||||
loadEncryptionConfig: encryptionconfig.LoadEncryptionConfig,
|
||||
}
|
||||
encryptionConfig.queue.Add(workqueueKey) // to avoid missing any file changes that occur in between the initial load and Run
|
||||
|
||||
return encryptionConfig
|
||||
}
|
||||
|
||||
// Run starts the controller and blocks until stopCh is closed.
|
||||
// Run starts the controller and blocks until ctx is canceled.
|
||||
func (d *DynamicKMSEncryptionConfigContent) Run(ctx context.Context) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer d.queue.ShutDown()
|
||||
|
||||
klog.InfoS("Starting controller", "name", d.name)
|
||||
defer klog.InfoS("Shutting down controller", "name", d.name)
|
||||
|
||||
// start worker for processing content
|
||||
go wait.UntilWithContext(ctx, d.runWorker, time.Second)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// start the loop that watches the encryption config file until stopCh is closed.
|
||||
go wait.UntilWithContext(ctx, func(ctx context.Context) {
|
||||
if err := d.watchEncryptionConfigFile(ctx); err != nil {
|
||||
// if there is an error while setting up or handling the watches, this will ensure that we will process the config file.
|
||||
defer d.queue.Add(workqueueKey)
|
||||
klog.ErrorS(err, "Failed to watch encryption config file, will retry later")
|
||||
}
|
||||
}, time.Second)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer wg.Done()
|
||||
defer d.queue.ShutDown()
|
||||
<-ctx.Done()
|
||||
}()
|
||||
|
||||
<-ctx.Done()
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer wg.Done()
|
||||
d.runWorker(ctx)
|
||||
}()
|
||||
|
||||
func (d *DynamicKMSEncryptionConfigContent) watchEncryptionConfigFile(ctx context.Context) error {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating fsnotify watcher: %w", err)
|
||||
}
|
||||
defer watcher.Close()
|
||||
// this function polls changes in the encryption config file by placing a dummy key in the queue.
|
||||
// the 'runWorker' function then picks up this dummy key and processes the changes.
|
||||
// the goroutine terminates when 'ctx' is canceled.
|
||||
_ = wait.PollUntilContextCancel(
|
||||
ctx,
|
||||
EncryptionConfigFileChangePollDuration,
|
||||
true,
|
||||
func(ctx context.Context) (bool, error) {
|
||||
// add dummy item to the queue to trigger file content processing.
|
||||
d.queue.Add(workqueueKey)
|
||||
|
||||
if err = watcher.Add(d.filePath); err != nil {
|
||||
return fmt.Errorf("error adding watch for file %s: %w", d.filePath, err)
|
||||
}
|
||||
// return false to continue polling.
|
||||
return false, nil
|
||||
},
|
||||
)
|
||||
|
||||
for {
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
if err := d.handleWatchEvent(event, watcher); err != nil {
|
||||
return err
|
||||
}
|
||||
case err := <-watcher.Errors:
|
||||
return fmt.Errorf("received fsnotify error: %w", err)
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DynamicKMSEncryptionConfigContent) handleWatchEvent(event fsnotify.Event, watcher *fsnotify.Watcher) error {
|
||||
// This should be executed after restarting the watch (if applicable) to ensure no file event will be missing.
|
||||
defer d.queue.Add(workqueueKey)
|
||||
|
||||
// return if file has not been removed or renamed.
|
||||
if event.Op&(fsnotify.Remove|fsnotify.Rename) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := watcher.Remove(d.filePath); err != nil {
|
||||
klog.V(2).InfoS("Failed to remove file watch, it may have been deleted", "file", d.filePath, "err", err)
|
||||
}
|
||||
if err := watcher.Add(d.filePath); err != nil {
|
||||
return fmt.Errorf("error adding watch for file %s: %w", d.filePath, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// runWorker to process file content
|
||||
@ -161,6 +148,12 @@ func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem(serverCtx contex
|
||||
}
|
||||
defer d.queue.Done(key)
|
||||
|
||||
d.processWorkItem(serverCtx, key)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (d *DynamicKMSEncryptionConfigContent) processWorkItem(serverCtx context.Context, workqueueKey interface{}) {
|
||||
var (
|
||||
updatedEffectiveConfig bool
|
||||
err error
|
||||
@ -188,25 +181,25 @@ func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem(serverCtx contex
|
||||
metrics.RecordEncryptionConfigAutomaticReloadFailure(d.apiServerID)
|
||||
utilruntime.HandleError(fmt.Errorf("error processing encryption config file %s: %v", d.filePath, err))
|
||||
// add dummy item back to the queue to trigger file content processing.
|
||||
d.queue.AddRateLimited(key)
|
||||
d.queue.AddRateLimited(workqueueKey)
|
||||
}
|
||||
}()
|
||||
|
||||
encryptionConfiguration, configChanged, err = d.processEncryptionConfig(ctx)
|
||||
if err != nil {
|
||||
return true
|
||||
return
|
||||
}
|
||||
if !configChanged {
|
||||
return true
|
||||
return
|
||||
}
|
||||
|
||||
if len(encryptionConfiguration.HealthChecks) != 1 {
|
||||
err = fmt.Errorf("unexpected number of healthz checks: %d. Should have only one", len(encryptionConfiguration.HealthChecks))
|
||||
return true
|
||||
return
|
||||
}
|
||||
// get healthz checks for all new KMS plugins.
|
||||
if err = d.validateNewTransformersHealth(ctx, encryptionConfiguration.HealthChecks[0], encryptionConfiguration.KMSCloseGracePeriod); err != nil {
|
||||
return true
|
||||
return
|
||||
}
|
||||
|
||||
// update transformers.
|
||||
@ -223,26 +216,37 @@ func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem(serverCtx contex
|
||||
klog.V(2).InfoS("Loaded new kms encryption config content", "name", d.name)
|
||||
|
||||
updatedEffectiveConfig = true
|
||||
return true
|
||||
}
|
||||
|
||||
// loadEncryptionConfig processes the next set of content from the file.
|
||||
func (d *DynamicKMSEncryptionConfigContent) processEncryptionConfig(ctx context.Context) (
|
||||
encryptionConfiguration *encryptionconfig.EncryptionConfiguration,
|
||||
_ *encryptionconfig.EncryptionConfiguration,
|
||||
configChanged bool,
|
||||
err error,
|
||||
_ error,
|
||||
) {
|
||||
// this code path will only execute if reload=true. So passing true explicitly.
|
||||
encryptionConfiguration, err = encryptionconfig.LoadEncryptionConfig(ctx, d.filePath, true, d.apiServerID)
|
||||
contentHash, err := d.getEncryptionConfigHash(ctx, d.filePath)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// check if encryptionConfig is different from the current. Do nothing if they are the same.
|
||||
if encryptionConfiguration.EncryptionFileContentHash == d.lastLoadedEncryptionConfigHash {
|
||||
klog.V(4).InfoS("Encryption config has not changed", "name", d.name)
|
||||
if contentHash == d.lastLoadedEncryptionConfigHash {
|
||||
klog.V(4).InfoS("Encryption config has not changed (before load)", "name", d.name)
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
// this code path will only execute if reload=true. So passing true explicitly.
|
||||
encryptionConfiguration, err := d.loadEncryptionConfig(ctx, d.filePath, true, d.apiServerID)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// check if encryptionConfig is different from the current (again to avoid TOCTOU). Do nothing if they are the same.
|
||||
if encryptionConfiguration.EncryptionFileContentHash == d.lastLoadedEncryptionConfigHash {
|
||||
klog.V(4).InfoS("Encryption config has not changed (after load)", "name", d.name)
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
return encryptionConfiguration, true, nil
|
||||
}
|
||||
|
||||
|
@ -18,156 +18,374 @@ package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
)
|
||||
|
||||
func TestProcessEncryptionConfig(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
filePath string
|
||||
expectError bool
|
||||
func TestController(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv1, true)()
|
||||
|
||||
const expectedSuccessMetricValue = `
|
||||
# HELP apiserver_encryption_config_controller_automatic_reload_success_total [ALPHA] Total number of successful automatic reloads of encryption configuration split by apiserver identity.
|
||||
# TYPE apiserver_encryption_config_controller_automatic_reload_success_total counter
|
||||
apiserver_encryption_config_controller_automatic_reload_success_total{apiserver_id_hash="sha256:cd8a60cec6134082e9f37e7a4146b4bc14a0bf8a863237c36ec8fdb658c3e027"} 1
|
||||
`
|
||||
const expectedFailureMetricValue = `
|
||||
# HELP apiserver_encryption_config_controller_automatic_reload_failures_total [ALPHA] Total number of failed automatic reloads of encryption configuration split by apiserver identity.
|
||||
# TYPE apiserver_encryption_config_controller_automatic_reload_failures_total counter
|
||||
apiserver_encryption_config_controller_automatic_reload_failures_total{apiserver_id_hash="sha256:cd8a60cec6134082e9f37e7a4146b4bc14a0bf8a863237c36ec8fdb658c3e027"} 1
|
||||
`
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
wantECFileHash string
|
||||
wantTransformerClosed bool
|
||||
wantLoadCalls int
|
||||
wantHashCalls int
|
||||
wantAddRateLimitedCount uint64
|
||||
wantMetrics string
|
||||
mockLoadEncryptionConfig func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error)
|
||||
mockGetEncryptionConfigHash func(ctx context.Context, filepath string) (string, error)
|
||||
}{
|
||||
{
|
||||
name: "empty config file",
|
||||
filePath: "testdata/empty_config.yaml",
|
||||
expectError: true,
|
||||
name: "when invalid config is provided previous config shouldn't be changed",
|
||||
wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
|
||||
wantLoadCalls: 1,
|
||||
wantHashCalls: 1,
|
||||
wantTransformerClosed: true,
|
||||
wantMetrics: expectedFailureMetricValue,
|
||||
wantAddRateLimitedCount: 1,
|
||||
mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
|
||||
return "always changes and never errors", nil
|
||||
},
|
||||
mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
|
||||
return nil, fmt.Errorf("empty config file")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "when new valid config is provided it should be updated",
|
||||
wantECFileHash: "some new config hash",
|
||||
wantLoadCalls: 1,
|
||||
wantHashCalls: 1,
|
||||
wantMetrics: expectedSuccessMetricValue,
|
||||
wantAddRateLimitedCount: 0,
|
||||
mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
|
||||
return "always changes and never errors", nil
|
||||
},
|
||||
mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
|
||||
return &encryptionconfig.EncryptionConfiguration{
|
||||
HealthChecks: []healthz.HealthChecker{
|
||||
&mockHealthChecker{
|
||||
pluginName: "valid-plugin",
|
||||
err: nil,
|
||||
},
|
||||
},
|
||||
EncryptionFileContentHash: "some new config hash",
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "when same valid config is provided previous config shouldn't be changed",
|
||||
wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
|
||||
wantLoadCalls: 1,
|
||||
wantHashCalls: 1,
|
||||
wantTransformerClosed: true,
|
||||
wantMetrics: "",
|
||||
wantAddRateLimitedCount: 0,
|
||||
mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
|
||||
return "always changes and never errors", nil
|
||||
},
|
||||
mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
|
||||
return &encryptionconfig.EncryptionConfiguration{
|
||||
HealthChecks: []healthz.HealthChecker{
|
||||
&mockHealthChecker{
|
||||
pluginName: "valid-plugin",
|
||||
err: nil,
|
||||
},
|
||||
},
|
||||
// hash of initial "testdata/ec_config.yaml" config file before reloading
|
||||
EncryptionFileContentHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "when transformer's health check fails previous config shouldn't be changed",
|
||||
wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
|
||||
wantLoadCalls: 1,
|
||||
wantHashCalls: 1,
|
||||
wantTransformerClosed: true,
|
||||
wantMetrics: expectedFailureMetricValue,
|
||||
wantAddRateLimitedCount: 1,
|
||||
mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
|
||||
return "always changes and never errors", nil
|
||||
},
|
||||
mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
|
||||
return &encryptionconfig.EncryptionConfiguration{
|
||||
HealthChecks: []healthz.HealthChecker{
|
||||
&mockHealthChecker{
|
||||
pluginName: "invalid-plugin",
|
||||
err: fmt.Errorf("mockingly failing"),
|
||||
},
|
||||
},
|
||||
KMSCloseGracePeriod: time.Second,
|
||||
EncryptionFileContentHash: "anything different",
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "when multiple health checks are present previous config shouldn't be changed",
|
||||
wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
|
||||
wantLoadCalls: 1,
|
||||
wantHashCalls: 1,
|
||||
wantTransformerClosed: true,
|
||||
wantMetrics: expectedFailureMetricValue,
|
||||
wantAddRateLimitedCount: 1,
|
||||
mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
|
||||
return "always changes and never errors", nil
|
||||
},
|
||||
mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
|
||||
return &encryptionconfig.EncryptionConfiguration{
|
||||
HealthChecks: []healthz.HealthChecker{
|
||||
&mockHealthChecker{
|
||||
pluginName: "valid-plugin",
|
||||
err: nil,
|
||||
},
|
||||
&mockHealthChecker{
|
||||
pluginName: "another-valid-plugin",
|
||||
err: nil,
|
||||
},
|
||||
},
|
||||
EncryptionFileContentHash: "anything different",
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "when invalid health check URL is provided previous config shouldn't be changed",
|
||||
wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
|
||||
wantLoadCalls: 1,
|
||||
wantHashCalls: 1,
|
||||
wantTransformerClosed: true,
|
||||
wantMetrics: expectedFailureMetricValue,
|
||||
wantAddRateLimitedCount: 1,
|
||||
mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
|
||||
return "always changes and never errors", nil
|
||||
},
|
||||
mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
|
||||
return &encryptionconfig.EncryptionConfiguration{
|
||||
HealthChecks: []healthz.HealthChecker{
|
||||
&mockHealthChecker{
|
||||
pluginName: "invalid\nname",
|
||||
err: nil,
|
||||
},
|
||||
},
|
||||
EncryptionFileContentHash: "anything different",
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "when config is not updated transformers are closed correctly",
|
||||
wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
|
||||
wantLoadCalls: 1,
|
||||
wantHashCalls: 1,
|
||||
wantTransformerClosed: true,
|
||||
wantMetrics: "",
|
||||
wantAddRateLimitedCount: 0,
|
||||
mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
|
||||
return "always changes and never errors", nil
|
||||
},
|
||||
mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
|
||||
return &encryptionconfig.EncryptionConfiguration{
|
||||
HealthChecks: []healthz.HealthChecker{
|
||||
&mockHealthChecker{
|
||||
pluginName: "valid-plugin",
|
||||
err: nil,
|
||||
},
|
||||
},
|
||||
// hash of initial "testdata/ec_config.yaml" config file before reloading
|
||||
EncryptionFileContentHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "when config hash is not updated transformers are closed correctly",
|
||||
wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
|
||||
wantLoadCalls: 0,
|
||||
wantHashCalls: 1,
|
||||
wantTransformerClosed: true,
|
||||
wantMetrics: "",
|
||||
wantAddRateLimitedCount: 0,
|
||||
mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
|
||||
// hash of initial "testdata/ec_config.yaml" config file before reloading
|
||||
return "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3", nil
|
||||
},
|
||||
mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
|
||||
return nil, fmt.Errorf("should not be called")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "when config hash errors transformers are closed correctly",
|
||||
wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3",
|
||||
wantLoadCalls: 0,
|
||||
wantHashCalls: 1,
|
||||
wantTransformerClosed: true,
|
||||
wantMetrics: expectedFailureMetricValue,
|
||||
wantAddRateLimitedCount: 1,
|
||||
mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) {
|
||||
return "", fmt.Errorf("some io error")
|
||||
},
|
||||
mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
|
||||
return nil, fmt.Errorf("should not be called")
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
d := NewDynamicEncryptionConfiguration(
|
||||
testCase.name,
|
||||
testCase.filePath,
|
||||
nil,
|
||||
"",
|
||||
"",
|
||||
)
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
serverCtx, closeServer := context.WithCancel(context.Background())
|
||||
t.Cleanup(closeServer)
|
||||
|
||||
_, _, err := d.processEncryptionConfig(ctx)
|
||||
if testCase.expectError && err == nil {
|
||||
t.Fatalf("expected error but got none")
|
||||
legacyregistry.Reset()
|
||||
|
||||
// load initial encryption config
|
||||
encryptionConfiguration, err := encryptionconfig.LoadEncryptionConfig(
|
||||
serverCtx,
|
||||
"testdata/ec_config.yaml",
|
||||
true,
|
||||
"test-apiserver",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to load encryption config: %v", err)
|
||||
}
|
||||
if !testCase.expectError && err != nil {
|
||||
t.Fatalf("expected no error but got %v", err)
|
||||
|
||||
d := NewDynamicEncryptionConfiguration(
|
||||
"test-controller",
|
||||
"does not matter",
|
||||
encryptionconfig.NewDynamicTransformers(
|
||||
encryptionConfiguration.Transformers,
|
||||
encryptionConfiguration.HealthChecks[0],
|
||||
closeServer,
|
||||
encryptionConfiguration.KMSCloseGracePeriod,
|
||||
),
|
||||
encryptionConfiguration.EncryptionFileContentHash,
|
||||
"test-apiserver",
|
||||
)
|
||||
d.queue.ShutDown() // we do not use the real queue during tests
|
||||
|
||||
queue := &mockWorkQueue{
|
||||
addCalled: make(chan struct{}),
|
||||
cancel: closeServer,
|
||||
}
|
||||
d.queue = queue
|
||||
|
||||
var hashCalls, loadCalls int
|
||||
d.loadEncryptionConfig = func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) {
|
||||
loadCalls++
|
||||
queue.ctx = ctx
|
||||
return test.mockLoadEncryptionConfig(ctx, filepath, reload, apiServerID)
|
||||
}
|
||||
d.getEncryptionConfigHash = func(ctx context.Context, filepath string) (string, error) {
|
||||
hashCalls++
|
||||
queue.ctx = ctx
|
||||
return test.mockGetEncryptionConfigHash(ctx, filepath)
|
||||
}
|
||||
|
||||
d.Run(serverCtx) // this should block and run exactly one iteration of the worker loop
|
||||
|
||||
if test.wantECFileHash != d.lastLoadedEncryptionConfigHash {
|
||||
t.Errorf("expected encryption config hash %q but got %q", test.wantECFileHash, d.lastLoadedEncryptionConfigHash)
|
||||
}
|
||||
|
||||
if test.wantLoadCalls != loadCalls {
|
||||
t.Errorf("load calls does not match: want=%v, got=%v", test.wantLoadCalls, loadCalls)
|
||||
}
|
||||
|
||||
if test.wantHashCalls != hashCalls {
|
||||
t.Errorf("hash calls does not match: want=%v, got=%v", test.wantHashCalls, hashCalls)
|
||||
}
|
||||
|
||||
if test.wantTransformerClosed != queue.wasCanceled {
|
||||
t.Errorf("transformer closed does not match: want=%v, got=%v", test.wantTransformerClosed, queue.wasCanceled)
|
||||
}
|
||||
|
||||
if test.wantAddRateLimitedCount != queue.addRateLimitedCount.Load() {
|
||||
t.Errorf("queue addRateLimitedCount does not match: want=%v, got=%v", test.wantAddRateLimitedCount, queue.addRateLimitedCount.Load())
|
||||
}
|
||||
|
||||
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(test.wantMetrics),
|
||||
"apiserver_encryption_config_controller_automatic_reload_success_total",
|
||||
"apiserver_encryption_config_controller_automatic_reload_failures_total",
|
||||
); err != nil {
|
||||
t.Errorf("failed to validate metrics: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchEncryptionConfigFile(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
generateEvent func(filePath string, cancel context.CancelFunc)
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "file not renamed or removed",
|
||||
expectError: false,
|
||||
generateEvent: func(filePath string, cancel context.CancelFunc) {
|
||||
os.Chtimes(filePath, time.Now(), time.Now())
|
||||
type mockWorkQueue struct {
|
||||
workqueue.RateLimitingInterface // will panic if any unexpected method is called
|
||||
|
||||
// wait for the event to be handled
|
||||
time.Sleep(1 * time.Second)
|
||||
cancel()
|
||||
os.Remove(filePath)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "file renamed",
|
||||
expectError: true,
|
||||
generateEvent: func(filePath string, cancel context.CancelFunc) {
|
||||
os.Rename(filePath, filePath+"1")
|
||||
closeOnce sync.Once
|
||||
addCalled chan struct{}
|
||||
|
||||
// wait for the event to be handled
|
||||
time.Sleep(1 * time.Second)
|
||||
os.Remove(filePath + "1")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "file removed",
|
||||
expectError: true,
|
||||
generateEvent: func(filePath string, cancel context.CancelFunc) {
|
||||
// allow watcher handle to start
|
||||
time.Sleep(1 * time.Second)
|
||||
os.Remove(filePath)
|
||||
},
|
||||
},
|
||||
}
|
||||
count atomic.Uint64
|
||||
ctx context.Context
|
||||
wasCanceled bool
|
||||
cancel func()
|
||||
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
testFilePath := copyFileForTest(t, "testdata/ec_config.yaml")
|
||||
addRateLimitedCount atomic.Uint64
|
||||
}
|
||||
|
||||
d := NewDynamicEncryptionConfiguration(
|
||||
testCase.name,
|
||||
testFilePath,
|
||||
nil,
|
||||
"",
|
||||
"",
|
||||
)
|
||||
func (m *mockWorkQueue) Done(item interface{}) {
|
||||
m.count.Add(1)
|
||||
m.wasCanceled = m.ctx.Err() != nil
|
||||
m.cancel()
|
||||
}
|
||||
|
||||
errs := make(chan error, 1)
|
||||
go func() {
|
||||
err := d.watchEncryptionConfigFile(ctx)
|
||||
errs <- err
|
||||
}()
|
||||
func (m *mockWorkQueue) Get() (item interface{}, shutdown bool) {
|
||||
<-m.addCalled
|
||||
|
||||
testCase.generateEvent(d.filePath, cancel)
|
||||
|
||||
err := <-errs
|
||||
if testCase.expectError && err == nil {
|
||||
t.Fatalf("expected error but got none")
|
||||
}
|
||||
if !testCase.expectError && err != nil {
|
||||
t.Fatalf("expected no error but got %v", err)
|
||||
}
|
||||
})
|
||||
switch m.count.Load() {
|
||||
case 0:
|
||||
return nil, false
|
||||
case 1:
|
||||
return nil, true
|
||||
default:
|
||||
panic("too many calls to Get")
|
||||
}
|
||||
}
|
||||
|
||||
func copyFileForTest(t *testing.T, srcFilePath string) string {
|
||||
t.Helper()
|
||||
|
||||
// get directory from source file path
|
||||
srcDir := filepath.Dir(srcFilePath)
|
||||
|
||||
// get file name from source file path
|
||||
srcFileName := filepath.Base(srcFilePath)
|
||||
|
||||
// set new file path
|
||||
dstFilePath := filepath.Join(srcDir, "test_"+srcFileName)
|
||||
|
||||
// copy src file to dst file
|
||||
r, err := os.Open(srcFilePath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open source file: %v", err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
w, err := os.Create(dstFilePath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create destination file: %v", err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
// copy the file
|
||||
_, err = io.Copy(w, r)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to copy file: %v", err)
|
||||
}
|
||||
|
||||
err = w.Close()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to close destination file: %v", err)
|
||||
}
|
||||
|
||||
return dstFilePath
|
||||
func (m *mockWorkQueue) Add(item interface{}) {
|
||||
m.closeOnce.Do(func() {
|
||||
close(m.addCalled)
|
||||
})
|
||||
}
|
||||
|
||||
func (m *mockWorkQueue) ShutDown() {}
|
||||
func (m *mockWorkQueue) AddRateLimited(item interface{}) { m.addRateLimitedCount.Add(1) }
|
||||
|
||||
type mockHealthChecker struct {
|
||||
pluginName string
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *mockHealthChecker) Check(req *http.Request) error {
|
||||
return m.err
|
||||
}
|
||||
|
||||
func (m *mockHealthChecker) Name() string {
|
||||
return m.pluginName
|
||||
}
|
||||
|
@ -47,6 +47,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
|
||||
mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v1beta1"
|
||||
@ -308,6 +309,9 @@ resources:
|
||||
func TestEncryptionConfigHotReload(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv1, true)()
|
||||
|
||||
// this makes the test super responsive. It's set to a default of 1 minute.
|
||||
encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Second
|
||||
|
||||
storageConfig := framework.SharedEtcd()
|
||||
encryptionConfig := `
|
||||
kind: EncryptionConfiguration
|
||||
@ -407,7 +411,7 @@ resources:
|
||||
|
||||
// implementing this brute force approach instead of fancy channel notification to avoid test specific code in prod.
|
||||
// wait for config to be observed
|
||||
verifyIfKMSTransformersSwapped(t, wantPrefixForSecrets, test)
|
||||
verifyIfKMSTransformersSwapped(t, wantPrefixForSecrets, "", test)
|
||||
|
||||
// run storage migration
|
||||
// get secrets
|
||||
@ -477,6 +481,10 @@ resources:
|
||||
}
|
||||
|
||||
// remove old KMS provider
|
||||
// verifyIfKMSTransformersSwapped sometimes passes even before the changes in the encryption config file are observed.
|
||||
// this causes the metrics tests to fail, which validate two config changes.
|
||||
// this may happen when an existing KMS provider is already running (e.g., new-kms-provider-for-secrets in this case).
|
||||
// to ensure that the changes are observed, we added one more provider (kms-provider-to-encrypt-all) and are validating it in verifyIfKMSTransformersSwapped.
|
||||
encryptionConfigWithoutOldProvider := `
|
||||
kind: EncryptionConfiguration
|
||||
apiVersion: apiserver.config.k8s.io/v1
|
||||
@ -495,13 +503,25 @@ resources:
|
||||
name: new-kms-provider-for-configmaps
|
||||
cachesize: 1000
|
||||
endpoint: unix:///@new-kms-provider.sock
|
||||
- resources:
|
||||
- '*.*'
|
||||
providers:
|
||||
- kms:
|
||||
name: kms-provider-to-encrypt-all
|
||||
cachesize: 1000
|
||||
endpoint: unix:///@new-encrypt-all-kms-provider.sock
|
||||
- identity: {}
|
||||
`
|
||||
// start new KMS Plugin
|
||||
_ = mock.NewBase64Plugin(t, "@new-encrypt-all-kms-provider.sock")
|
||||
|
||||
// update encryption config and wait for hot reload
|
||||
updateFile(t, test.configDir, encryptionConfigFileName, []byte(encryptionConfigWithoutOldProvider))
|
||||
|
||||
wantPrefixForEncryptAll := "k8s:enc:kms:v1:kms-provider-to-encrypt-all:"
|
||||
|
||||
// wait for config to be observed
|
||||
verifyIfKMSTransformersSwapped(t, wantPrefixForSecrets, test)
|
||||
verifyIfKMSTransformersSwapped(t, wantPrefixForSecrets, wantPrefixForEncryptAll, test)
|
||||
|
||||
// confirm that reading secrets still works
|
||||
_, err = test.restClient.CoreV1().Secrets(testNamespace).Get(
|
||||
@ -801,9 +821,12 @@ resources:
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncryptionConfigHotReloadFileWatch(t *testing.T) {
|
||||
func TestEncryptionConfigHotReloadFilePolling(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv1, true)()
|
||||
|
||||
// this makes the test super responsive. It's set to a default of 1 minute.
|
||||
encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Second
|
||||
|
||||
testCases := []struct {
|
||||
sleep time.Duration
|
||||
name string
|
||||
@ -948,7 +971,7 @@ resources:
|
||||
func verifyPrefixOfSecretResource(t *testing.T, wantPrefix string, test *transformTest) {
|
||||
// implementing this brute force approach instead of fancy channel notification to avoid test specific code in prod.
|
||||
// wait for config to be observed
|
||||
verifyIfKMSTransformersSwapped(t, wantPrefix, test)
|
||||
verifyIfKMSTransformersSwapped(t, wantPrefix, "", test)
|
||||
|
||||
// run storage migration
|
||||
secretsList, err := test.restClient.CoreV1().Secrets("").List(
|
||||
@ -982,7 +1005,7 @@ func verifyPrefixOfSecretResource(t *testing.T, wantPrefix string, test *transfo
|
||||
}
|
||||
}
|
||||
|
||||
func verifyIfKMSTransformersSwapped(t *testing.T, wantPrefix string, test *transformTest) {
|
||||
func verifyIfKMSTransformersSwapped(t *testing.T, wantPrefix, wantPrefixForEncryptAll string, test *transformTest) {
|
||||
t.Helper()
|
||||
|
||||
var swapErr error
|
||||
@ -1013,6 +1036,29 @@ func verifyIfKMSTransformersSwapped(t *testing.T, wantPrefix string, test *trans
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if wantPrefixForEncryptAll != "" {
|
||||
deploymentName := fmt.Sprintf("deployment-%d", idx)
|
||||
_, err := test.createDeployment(deploymentName, "default")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test secret, error: %v", err)
|
||||
}
|
||||
|
||||
rawEnvelope, err := test.readRawRecordFromETCD(test.getETCDPathForResource(test.storageConfig.Prefix, "", "deployments", deploymentName, "default"))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read %s from etcd: %v", test.getETCDPathForResource(test.storageConfig.Prefix, "", "deployments", deploymentName, "default"), err)
|
||||
}
|
||||
|
||||
// check prefix
|
||||
if !bytes.HasPrefix(rawEnvelope.Kvs[0].Value, []byte(wantPrefixForEncryptAll)) {
|
||||
idx++
|
||||
|
||||
swapErr = fmt.Errorf("expected deployment to be prefixed with %s, but got %s", wantPrefixForEncryptAll, rawEnvelope.Kvs[0].Value)
|
||||
|
||||
// return nil error to continue polling till timeout
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
if pollErr == wait.ErrWaitTimeout {
|
||||
|
Loading…
Reference in New Issue
Block a user