diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go index a7b35153966..5b3da51faf8 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go @@ -504,6 +504,24 @@ func (h *kmsv2PluginProbe) isKMSv2ProviderHealthyAndMaybeRotateDEK(ctx context.C // loadConfig parses the encryption configuration file at filepath and returns the parsed config and hash of the file. func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfiguration, string, error) { + data, contentHash, err := loadDataAndHash(filepath) + if err != nil { + return nil, "", fmt.Errorf("error while loading file: %w", err) + } + + configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil) + if err != nil { + return nil, "", fmt.Errorf("error decoding encryption provider configuration file %q: %w", filepath, err) + } + config, ok := configObj.(*apiserverconfig.EncryptionConfiguration) + if !ok { + return nil, "", fmt.Errorf("got unexpected config type: %v", gvk) + } + + return config, contentHash, validation.ValidateEncryptionConfiguration(config, reload).ToAggregate() +} + +func loadDataAndHash(filepath string) ([]byte, string, error) { f, err := os.Open(filepath) if err != nil { return nil, "", fmt.Errorf("error opening encryption provider configuration file %q: %w", filepath, err) @@ -518,16 +536,14 @@ func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfig return nil, "", fmt.Errorf("encryption provider configuration file %q is empty", filepath) } - configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil) - if err != nil { - return nil, "", fmt.Errorf("error decoding encryption provider configuration file %q: %w", filepath, err) - } - config, ok := configObj.(*apiserverconfig.EncryptionConfiguration) - if !ok { - return nil, "", fmt.Errorf("got unexpected config type: %v", gvk) - } + return data, computeEncryptionConfigHash(data), nil +} - return config, computeEncryptionConfigHash(data), validation.ValidateEncryptionConfiguration(config, reload).ToAggregate() +// GetEncryptionConfigHash reads the encryption configuration file at filepath and returns the hash of the file. +// It does not attempt to decode or load the config, and serves as a cheap check to determine if the file has changed. +func GetEncryptionConfigHash(filepath string) (string, error) { + _, contentHash, err := loadDataAndHash(filepath) + return contentHash, err } // prefixTransformersAndProbes creates the set of transformers and KMS probes based on the given resource config. diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go index 3a81e397d91..c5ce81cf401 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go @@ -2177,3 +2177,48 @@ func logLines(logs string) []string { } return lines } + +func TestGetEncryptionConfigHash(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + filepath string + wantHash string + wantErr string + }{ + { + name: "empty config file content", + filepath: "testdata/invalid-configs/kms/invalid-content.yaml", + wantHash: "", + wantErr: `encryption provider configuration file "testdata/invalid-configs/kms/invalid-content.yaml" is empty`, + }, + { + name: "missing file", + filepath: "testdata/invalid-configs/kms/file-that-does-not-exist.yaml", + wantHash: "", + wantErr: `error opening encryption provider configuration file "testdata/invalid-configs/kms/file-that-does-not-exist.yaml": open testdata/invalid-configs/kms/file-that-does-not-exist.yaml: no such file or directory`, + }, + { + name: "valid file", + filepath: "testdata/valid-configs/secret-box-first.yaml", + wantHash: "c638c0327dbc3276dd1fcf3e67895d19ebca16b91ae0d19af24ef0759b8e0f66", + wantErr: ``, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got, err := GetEncryptionConfigHash(tt.filepath) + if errString(err) != tt.wantErr { + t.Errorf("GetEncryptionConfigHash() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.wantHash { + t.Errorf("GetEncryptionConfigHash() got = %v, want %v", got, tt.wantHash) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/controller/controller.go b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/controller/controller.go index 5bb90995b9e..94ea21c4331 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/controller/controller.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/controller/controller.go @@ -20,9 +20,9 @@ import ( "context" "fmt" "net/http" + "sync" "time" - "github.com/fsnotify/fsnotify" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server/healthz" @@ -35,6 +35,9 @@ import ( // workqueueKey is the dummy key used to process change in encryption config file. const workqueueKey = "key" +// EncryptionConfigFileChangePollDuration is exposed so that integration tests can crank up the reload speed. +var EncryptionConfigFileChangePollDuration = time.Minute + // DynamicKMSEncryptionConfigContent which can dynamically handle changes in encryption config file. type DynamicKMSEncryptionConfigContent struct { name string @@ -53,6 +56,10 @@ type DynamicKMSEncryptionConfigContent struct { // identity of the api server apiServerID string + + // can be swapped during testing + getEncryptionConfigHash func(ctx context.Context, filepath string) (string, error) + loadEncryptionConfig func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) } func init() { @@ -73,77 +80,57 @@ func NewDynamicEncryptionConfiguration( dynamicTransformers: dynamicTransformers, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), apiServerID: apiServerID, + getEncryptionConfigHash: func(_ context.Context, filepath string) (string, error) { + return encryptionconfig.GetEncryptionConfigHash(filepath) + }, + loadEncryptionConfig: encryptionconfig.LoadEncryptionConfig, } encryptionConfig.queue.Add(workqueueKey) // to avoid missing any file changes that occur in between the initial load and Run return encryptionConfig } -// Run starts the controller and blocks until stopCh is closed. +// Run starts the controller and blocks until ctx is canceled. func (d *DynamicKMSEncryptionConfigContent) Run(ctx context.Context) { defer utilruntime.HandleCrash() - defer d.queue.ShutDown() klog.InfoS("Starting controller", "name", d.name) defer klog.InfoS("Shutting down controller", "name", d.name) - // start worker for processing content - go wait.UntilWithContext(ctx, d.runWorker, time.Second) + var wg sync.WaitGroup - // start the loop that watches the encryption config file until stopCh is closed. - go wait.UntilWithContext(ctx, func(ctx context.Context) { - if err := d.watchEncryptionConfigFile(ctx); err != nil { - // if there is an error while setting up or handling the watches, this will ensure that we will process the config file. - defer d.queue.Add(workqueueKey) - klog.ErrorS(err, "Failed to watch encryption config file, will retry later") - } - }, time.Second) + wg.Add(1) + go func() { + defer utilruntime.HandleCrash() + defer wg.Done() + defer d.queue.ShutDown() + <-ctx.Done() + }() - <-ctx.Done() -} + wg.Add(1) + go func() { + defer utilruntime.HandleCrash() + defer wg.Done() + d.runWorker(ctx) + }() -func (d *DynamicKMSEncryptionConfigContent) watchEncryptionConfigFile(ctx context.Context) error { - watcher, err := fsnotify.NewWatcher() - if err != nil { - return fmt.Errorf("error creating fsnotify watcher: %w", err) - } - defer watcher.Close() + // this function polls changes in the encryption config file by placing a dummy key in the queue. + // the 'runWorker' function then picks up this dummy key and processes the changes. + // the goroutine terminates when 'ctx' is canceled. + _ = wait.PollUntilContextCancel( + ctx, + EncryptionConfigFileChangePollDuration, + true, + func(ctx context.Context) (bool, error) { + // add dummy item to the queue to trigger file content processing. + d.queue.Add(workqueueKey) - if err = watcher.Add(d.filePath); err != nil { - return fmt.Errorf("error adding watch for file %s: %w", d.filePath, err) - } + // return false to continue polling. + return false, nil + }, + ) - for { - select { - case event := <-watcher.Events: - if err := d.handleWatchEvent(event, watcher); err != nil { - return err - } - case err := <-watcher.Errors: - return fmt.Errorf("received fsnotify error: %w", err) - case <-ctx.Done(): - return nil - } - } -} - -func (d *DynamicKMSEncryptionConfigContent) handleWatchEvent(event fsnotify.Event, watcher *fsnotify.Watcher) error { - // This should be executed after restarting the watch (if applicable) to ensure no file event will be missing. - defer d.queue.Add(workqueueKey) - - // return if file has not been removed or renamed. - if event.Op&(fsnotify.Remove|fsnotify.Rename) == 0 { - return nil - } - - if err := watcher.Remove(d.filePath); err != nil { - klog.V(2).InfoS("Failed to remove file watch, it may have been deleted", "file", d.filePath, "err", err) - } - if err := watcher.Add(d.filePath); err != nil { - return fmt.Errorf("error adding watch for file %s: %w", d.filePath, err) - } - - return nil + wg.Wait() } // runWorker to process file content @@ -161,6 +148,12 @@ func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem(serverCtx contex } defer d.queue.Done(key) + d.processWorkItem(serverCtx, key) + + return true +} + +func (d *DynamicKMSEncryptionConfigContent) processWorkItem(serverCtx context.Context, workqueueKey interface{}) { var ( updatedEffectiveConfig bool err error @@ -188,25 +181,25 @@ func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem(serverCtx contex metrics.RecordEncryptionConfigAutomaticReloadFailure(d.apiServerID) utilruntime.HandleError(fmt.Errorf("error processing encryption config file %s: %v", d.filePath, err)) // add dummy item back to the queue to trigger file content processing. - d.queue.AddRateLimited(key) + d.queue.AddRateLimited(workqueueKey) } }() encryptionConfiguration, configChanged, err = d.processEncryptionConfig(ctx) if err != nil { - return true + return } if !configChanged { - return true + return } if len(encryptionConfiguration.HealthChecks) != 1 { err = fmt.Errorf("unexpected number of healthz checks: %d. Should have only one", len(encryptionConfiguration.HealthChecks)) - return true + return } // get healthz checks for all new KMS plugins. if err = d.validateNewTransformersHealth(ctx, encryptionConfiguration.HealthChecks[0], encryptionConfiguration.KMSCloseGracePeriod); err != nil { - return true + return } // update transformers. @@ -223,26 +216,37 @@ func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem(serverCtx contex klog.V(2).InfoS("Loaded new kms encryption config content", "name", d.name) updatedEffectiveConfig = true - return true } // loadEncryptionConfig processes the next set of content from the file. func (d *DynamicKMSEncryptionConfigContent) processEncryptionConfig(ctx context.Context) ( - encryptionConfiguration *encryptionconfig.EncryptionConfiguration, + _ *encryptionconfig.EncryptionConfiguration, configChanged bool, - err error, + _ error, ) { - // this code path will only execute if reload=true. So passing true explicitly. - encryptionConfiguration, err = encryptionconfig.LoadEncryptionConfig(ctx, d.filePath, true, d.apiServerID) + contentHash, err := d.getEncryptionConfigHash(ctx, d.filePath) if err != nil { return nil, false, err } // check if encryptionConfig is different from the current. Do nothing if they are the same. - if encryptionConfiguration.EncryptionFileContentHash == d.lastLoadedEncryptionConfigHash { - klog.V(4).InfoS("Encryption config has not changed", "name", d.name) + if contentHash == d.lastLoadedEncryptionConfigHash { + klog.V(4).InfoS("Encryption config has not changed (before load)", "name", d.name) return nil, false, nil } + + // this code path will only execute if reload=true. So passing true explicitly. + encryptionConfiguration, err := d.loadEncryptionConfig(ctx, d.filePath, true, d.apiServerID) + if err != nil { + return nil, false, err + } + + // check if encryptionConfig is different from the current (again to avoid TOCTOU). Do nothing if they are the same. + if encryptionConfiguration.EncryptionFileContentHash == d.lastLoadedEncryptionConfigHash { + klog.V(4).InfoS("Encryption config has not changed (after load)", "name", d.name) + return nil, false, nil + } + return encryptionConfiguration, true, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/controller/controller_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/controller/controller_test.go index c2a007c408a..da3c4047f53 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/controller/controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/controller/controller_test.go @@ -18,156 +18,374 @@ package controller import ( "context" - "io" - "os" - "path/filepath" + "fmt" + "net/http" + "strings" + "sync" + "sync/atomic" "testing" "time" + + "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/server/healthz" + "k8s.io/apiserver/pkg/server/options/encryptionconfig" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/util/workqueue" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/component-base/metrics/testutil" ) -func TestProcessEncryptionConfig(t *testing.T) { - testCases := []struct { - name string - filePath string - expectError bool +func TestController(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv1, true)() + + const expectedSuccessMetricValue = ` +# HELP apiserver_encryption_config_controller_automatic_reload_success_total [ALPHA] Total number of successful automatic reloads of encryption configuration split by apiserver identity. +# TYPE apiserver_encryption_config_controller_automatic_reload_success_total counter +apiserver_encryption_config_controller_automatic_reload_success_total{apiserver_id_hash="sha256:cd8a60cec6134082e9f37e7a4146b4bc14a0bf8a863237c36ec8fdb658c3e027"} 1 +` + const expectedFailureMetricValue = ` +# HELP apiserver_encryption_config_controller_automatic_reload_failures_total [ALPHA] Total number of failed automatic reloads of encryption configuration split by apiserver identity. +# TYPE apiserver_encryption_config_controller_automatic_reload_failures_total counter +apiserver_encryption_config_controller_automatic_reload_failures_total{apiserver_id_hash="sha256:cd8a60cec6134082e9f37e7a4146b4bc14a0bf8a863237c36ec8fdb658c3e027"} 1 +` + + tests := []struct { + name string + wantECFileHash string + wantTransformerClosed bool + wantLoadCalls int + wantHashCalls int + wantAddRateLimitedCount uint64 + wantMetrics string + mockLoadEncryptionConfig func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) + mockGetEncryptionConfigHash func(ctx context.Context, filepath string) (string, error) }{ { - name: "empty config file", - filePath: "testdata/empty_config.yaml", - expectError: true, + name: "when invalid config is provided previous config shouldn't be changed", + wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3", + wantLoadCalls: 1, + wantHashCalls: 1, + wantTransformerClosed: true, + wantMetrics: expectedFailureMetricValue, + wantAddRateLimitedCount: 1, + mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) { + return "always changes and never errors", nil + }, + mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) { + return nil, fmt.Errorf("empty config file") + }, + }, + { + name: "when new valid config is provided it should be updated", + wantECFileHash: "some new config hash", + wantLoadCalls: 1, + wantHashCalls: 1, + wantMetrics: expectedSuccessMetricValue, + wantAddRateLimitedCount: 0, + mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) { + return "always changes and never errors", nil + }, + mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) { + return &encryptionconfig.EncryptionConfiguration{ + HealthChecks: []healthz.HealthChecker{ + &mockHealthChecker{ + pluginName: "valid-plugin", + err: nil, + }, + }, + EncryptionFileContentHash: "some new config hash", + }, nil + }, + }, + { + name: "when same valid config is provided previous config shouldn't be changed", + wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3", + wantLoadCalls: 1, + wantHashCalls: 1, + wantTransformerClosed: true, + wantMetrics: "", + wantAddRateLimitedCount: 0, + mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) { + return "always changes and never errors", nil + }, + mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) { + return &encryptionconfig.EncryptionConfiguration{ + HealthChecks: []healthz.HealthChecker{ + &mockHealthChecker{ + pluginName: "valid-plugin", + err: nil, + }, + }, + // hash of initial "testdata/ec_config.yaml" config file before reloading + EncryptionFileContentHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3", + }, nil + }, + }, + { + name: "when transformer's health check fails previous config shouldn't be changed", + wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3", + wantLoadCalls: 1, + wantHashCalls: 1, + wantTransformerClosed: true, + wantMetrics: expectedFailureMetricValue, + wantAddRateLimitedCount: 1, + mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) { + return "always changes and never errors", nil + }, + mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) { + return &encryptionconfig.EncryptionConfiguration{ + HealthChecks: []healthz.HealthChecker{ + &mockHealthChecker{ + pluginName: "invalid-plugin", + err: fmt.Errorf("mockingly failing"), + }, + }, + KMSCloseGracePeriod: time.Second, + EncryptionFileContentHash: "anything different", + }, nil + }, + }, + { + name: "when multiple health checks are present previous config shouldn't be changed", + wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3", + wantLoadCalls: 1, + wantHashCalls: 1, + wantTransformerClosed: true, + wantMetrics: expectedFailureMetricValue, + wantAddRateLimitedCount: 1, + mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) { + return "always changes and never errors", nil + }, + mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) { + return &encryptionconfig.EncryptionConfiguration{ + HealthChecks: []healthz.HealthChecker{ + &mockHealthChecker{ + pluginName: "valid-plugin", + err: nil, + }, + &mockHealthChecker{ + pluginName: "another-valid-plugin", + err: nil, + }, + }, + EncryptionFileContentHash: "anything different", + }, nil + }, + }, + { + name: "when invalid health check URL is provided previous config shouldn't be changed", + wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3", + wantLoadCalls: 1, + wantHashCalls: 1, + wantTransformerClosed: true, + wantMetrics: expectedFailureMetricValue, + wantAddRateLimitedCount: 1, + mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) { + return "always changes and never errors", nil + }, + mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) { + return &encryptionconfig.EncryptionConfiguration{ + HealthChecks: []healthz.HealthChecker{ + &mockHealthChecker{ + pluginName: "invalid\nname", + err: nil, + }, + }, + EncryptionFileContentHash: "anything different", + }, nil + }, + }, + { + name: "when config is not updated transformers are closed correctly", + wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3", + wantLoadCalls: 1, + wantHashCalls: 1, + wantTransformerClosed: true, + wantMetrics: "", + wantAddRateLimitedCount: 0, + mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) { + return "always changes and never errors", nil + }, + mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) { + return &encryptionconfig.EncryptionConfiguration{ + HealthChecks: []healthz.HealthChecker{ + &mockHealthChecker{ + pluginName: "valid-plugin", + err: nil, + }, + }, + // hash of initial "testdata/ec_config.yaml" config file before reloading + EncryptionFileContentHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3", + }, nil + }, + }, + { + name: "when config hash is not updated transformers are closed correctly", + wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3", + wantLoadCalls: 0, + wantHashCalls: 1, + wantTransformerClosed: true, + wantMetrics: "", + wantAddRateLimitedCount: 0, + mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) { + // hash of initial "testdata/ec_config.yaml" config file before reloading + return "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3", nil + }, + mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) { + return nil, fmt.Errorf("should not be called") + }, + }, + { + name: "when config hash errors transformers are closed correctly", + wantECFileHash: "6bc9f4aa2e5587afbb96074e1809550cbc4de3cc3a35717dac8ff2800a147fd3", + wantLoadCalls: 0, + wantHashCalls: 1, + wantTransformerClosed: true, + wantMetrics: expectedFailureMetricValue, + wantAddRateLimitedCount: 1, + mockGetEncryptionConfigHash: func(ctx context.Context, filepath string) (string, error) { + return "", fmt.Errorf("some io error") + }, + mockLoadEncryptionConfig: func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) { + return nil, fmt.Errorf("should not be called") + }, }, } - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - ctx := context.Background() - d := NewDynamicEncryptionConfiguration( - testCase.name, - testCase.filePath, - nil, - "", - "", - ) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + serverCtx, closeServer := context.WithCancel(context.Background()) + t.Cleanup(closeServer) - _, _, err := d.processEncryptionConfig(ctx) - if testCase.expectError && err == nil { - t.Fatalf("expected error but got none") + legacyregistry.Reset() + + // load initial encryption config + encryptionConfiguration, err := encryptionconfig.LoadEncryptionConfig( + serverCtx, + "testdata/ec_config.yaml", + true, + "test-apiserver", + ) + if err != nil { + t.Fatalf("failed to load encryption config: %v", err) } - if !testCase.expectError && err != nil { - t.Fatalf("expected no error but got %v", err) + + d := NewDynamicEncryptionConfiguration( + "test-controller", + "does not matter", + encryptionconfig.NewDynamicTransformers( + encryptionConfiguration.Transformers, + encryptionConfiguration.HealthChecks[0], + closeServer, + encryptionConfiguration.KMSCloseGracePeriod, + ), + encryptionConfiguration.EncryptionFileContentHash, + "test-apiserver", + ) + d.queue.ShutDown() // we do not use the real queue during tests + + queue := &mockWorkQueue{ + addCalled: make(chan struct{}), + cancel: closeServer, + } + d.queue = queue + + var hashCalls, loadCalls int + d.loadEncryptionConfig = func(ctx context.Context, filepath string, reload bool, apiServerID string) (*encryptionconfig.EncryptionConfiguration, error) { + loadCalls++ + queue.ctx = ctx + return test.mockLoadEncryptionConfig(ctx, filepath, reload, apiServerID) + } + d.getEncryptionConfigHash = func(ctx context.Context, filepath string) (string, error) { + hashCalls++ + queue.ctx = ctx + return test.mockGetEncryptionConfigHash(ctx, filepath) + } + + d.Run(serverCtx) // this should block and run exactly one iteration of the worker loop + + if test.wantECFileHash != d.lastLoadedEncryptionConfigHash { + t.Errorf("expected encryption config hash %q but got %q", test.wantECFileHash, d.lastLoadedEncryptionConfigHash) + } + + if test.wantLoadCalls != loadCalls { + t.Errorf("load calls does not match: want=%v, got=%v", test.wantLoadCalls, loadCalls) + } + + if test.wantHashCalls != hashCalls { + t.Errorf("hash calls does not match: want=%v, got=%v", test.wantHashCalls, hashCalls) + } + + if test.wantTransformerClosed != queue.wasCanceled { + t.Errorf("transformer closed does not match: want=%v, got=%v", test.wantTransformerClosed, queue.wasCanceled) + } + + if test.wantAddRateLimitedCount != queue.addRateLimitedCount.Load() { + t.Errorf("queue addRateLimitedCount does not match: want=%v, got=%v", test.wantAddRateLimitedCount, queue.addRateLimitedCount.Load()) + } + + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(test.wantMetrics), + "apiserver_encryption_config_controller_automatic_reload_success_total", + "apiserver_encryption_config_controller_automatic_reload_failures_total", + ); err != nil { + t.Errorf("failed to validate metrics: %v", err) } }) } } -func TestWatchEncryptionConfigFile(t *testing.T) { - testCases := []struct { - name string - generateEvent func(filePath string, cancel context.CancelFunc) - expectError bool - }{ - { - name: "file not renamed or removed", - expectError: false, - generateEvent: func(filePath string, cancel context.CancelFunc) { - os.Chtimes(filePath, time.Now(), time.Now()) +type mockWorkQueue struct { + workqueue.RateLimitingInterface // will panic if any unexpected method is called - // wait for the event to be handled - time.Sleep(1 * time.Second) - cancel() - os.Remove(filePath) - }, - }, - { - name: "file renamed", - expectError: true, - generateEvent: func(filePath string, cancel context.CancelFunc) { - os.Rename(filePath, filePath+"1") + closeOnce sync.Once + addCalled chan struct{} - // wait for the event to be handled - time.Sleep(1 * time.Second) - os.Remove(filePath + "1") - }, - }, - { - name: "file removed", - expectError: true, - generateEvent: func(filePath string, cancel context.CancelFunc) { - // allow watcher handle to start - time.Sleep(1 * time.Second) - os.Remove(filePath) - }, - }, - } + count atomic.Uint64 + ctx context.Context + wasCanceled bool + cancel func() - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - testFilePath := copyFileForTest(t, "testdata/ec_config.yaml") + addRateLimitedCount atomic.Uint64 +} - d := NewDynamicEncryptionConfiguration( - testCase.name, - testFilePath, - nil, - "", - "", - ) +func (m *mockWorkQueue) Done(item interface{}) { + m.count.Add(1) + m.wasCanceled = m.ctx.Err() != nil + m.cancel() +} - errs := make(chan error, 1) - go func() { - err := d.watchEncryptionConfigFile(ctx) - errs <- err - }() +func (m *mockWorkQueue) Get() (item interface{}, shutdown bool) { + <-m.addCalled - testCase.generateEvent(d.filePath, cancel) - - err := <-errs - if testCase.expectError && err == nil { - t.Fatalf("expected error but got none") - } - if !testCase.expectError && err != nil { - t.Fatalf("expected no error but got %v", err) - } - }) + switch m.count.Load() { + case 0: + return nil, false + case 1: + return nil, true + default: + panic("too many calls to Get") } } -func copyFileForTest(t *testing.T, srcFilePath string) string { - t.Helper() - - // get directory from source file path - srcDir := filepath.Dir(srcFilePath) - - // get file name from source file path - srcFileName := filepath.Base(srcFilePath) - - // set new file path - dstFilePath := filepath.Join(srcDir, "test_"+srcFileName) - - // copy src file to dst file - r, err := os.Open(srcFilePath) - if err != nil { - t.Fatalf("failed to open source file: %v", err) - } - defer r.Close() - - w, err := os.Create(dstFilePath) - if err != nil { - t.Fatalf("failed to create destination file: %v", err) - } - defer w.Close() - - // copy the file - _, err = io.Copy(w, r) - if err != nil { - t.Fatalf("failed to copy file: %v", err) - } - - err = w.Close() - if err != nil { - t.Fatalf("failed to close destination file: %v", err) - } - - return dstFilePath +func (m *mockWorkQueue) Add(item interface{}) { + m.closeOnce.Do(func() { + close(m.addCalled) + }) +} + +func (m *mockWorkQueue) ShutDown() {} +func (m *mockWorkQueue) AddRateLimited(item interface{}) { m.addRateLimitedCount.Add(1) } + +type mockHealthChecker struct { + pluginName string + err error +} + +func (m *mockHealthChecker) Check(req *http.Request) error { + return m.err +} + +func (m *mockHealthChecker) Name() string { + return m.pluginName } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/controller/testdata/empty_config.yaml b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/controller/testdata/empty_config.yaml deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/test/integration/controlplane/transformation/kms_transformation_test.go b/test/integration/controlplane/transformation/kms_transformation_test.go index ebd8c8450cd..df84d7e3e58 100644 --- a/test/integration/controlplane/transformation/kms_transformation_test.go +++ b/test/integration/controlplane/transformation/kms_transformation_test.go @@ -47,6 +47,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" + encryptionconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller" "k8s.io/apiserver/pkg/storage/value" aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes" mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v1beta1" @@ -308,6 +309,9 @@ resources: func TestEncryptionConfigHotReload(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv1, true)() + // this makes the test super responsive. It's set to a default of 1 minute. + encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Second + storageConfig := framework.SharedEtcd() encryptionConfig := ` kind: EncryptionConfiguration @@ -407,7 +411,7 @@ resources: // implementing this brute force approach instead of fancy channel notification to avoid test specific code in prod. // wait for config to be observed - verifyIfKMSTransformersSwapped(t, wantPrefixForSecrets, test) + verifyIfKMSTransformersSwapped(t, wantPrefixForSecrets, "", test) // run storage migration // get secrets @@ -477,6 +481,10 @@ resources: } // remove old KMS provider + // verifyIfKMSTransformersSwapped sometimes passes even before the changes in the encryption config file are observed. + // this causes the metrics tests to fail, which validate two config changes. + // this may happen when an existing KMS provider is already running (e.g., new-kms-provider-for-secrets in this case). + // to ensure that the changes are observed, we added one more provider (kms-provider-to-encrypt-all) and are validating it in verifyIfKMSTransformersSwapped. encryptionConfigWithoutOldProvider := ` kind: EncryptionConfiguration apiVersion: apiserver.config.k8s.io/v1 @@ -495,13 +503,25 @@ resources: name: new-kms-provider-for-configmaps cachesize: 1000 endpoint: unix:///@new-kms-provider.sock + - resources: + - '*.*' + providers: + - kms: + name: kms-provider-to-encrypt-all + cachesize: 1000 + endpoint: unix:///@new-encrypt-all-kms-provider.sock + - identity: {} ` + // start new KMS Plugin + _ = mock.NewBase64Plugin(t, "@new-encrypt-all-kms-provider.sock") // update encryption config and wait for hot reload updateFile(t, test.configDir, encryptionConfigFileName, []byte(encryptionConfigWithoutOldProvider)) + wantPrefixForEncryptAll := "k8s:enc:kms:v1:kms-provider-to-encrypt-all:" + // wait for config to be observed - verifyIfKMSTransformersSwapped(t, wantPrefixForSecrets, test) + verifyIfKMSTransformersSwapped(t, wantPrefixForSecrets, wantPrefixForEncryptAll, test) // confirm that reading secrets still works _, err = test.restClient.CoreV1().Secrets(testNamespace).Get( @@ -801,9 +821,12 @@ resources: } } -func TestEncryptionConfigHotReloadFileWatch(t *testing.T) { +func TestEncryptionConfigHotReloadFilePolling(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv1, true)() + // this makes the test super responsive. It's set to a default of 1 minute. + encryptionconfigcontroller.EncryptionConfigFileChangePollDuration = time.Second + testCases := []struct { sleep time.Duration name string @@ -948,7 +971,7 @@ resources: func verifyPrefixOfSecretResource(t *testing.T, wantPrefix string, test *transformTest) { // implementing this brute force approach instead of fancy channel notification to avoid test specific code in prod. // wait for config to be observed - verifyIfKMSTransformersSwapped(t, wantPrefix, test) + verifyIfKMSTransformersSwapped(t, wantPrefix, "", test) // run storage migration secretsList, err := test.restClient.CoreV1().Secrets("").List( @@ -982,7 +1005,7 @@ func verifyPrefixOfSecretResource(t *testing.T, wantPrefix string, test *transfo } } -func verifyIfKMSTransformersSwapped(t *testing.T, wantPrefix string, test *transformTest) { +func verifyIfKMSTransformersSwapped(t *testing.T, wantPrefix, wantPrefixForEncryptAll string, test *transformTest) { t.Helper() var swapErr error @@ -1013,6 +1036,29 @@ func verifyIfKMSTransformersSwapped(t *testing.T, wantPrefix string, test *trans return false, nil } + if wantPrefixForEncryptAll != "" { + deploymentName := fmt.Sprintf("deployment-%d", idx) + _, err := test.createDeployment(deploymentName, "default") + if err != nil { + t.Fatalf("Failed to create test secret, error: %v", err) + } + + rawEnvelope, err := test.readRawRecordFromETCD(test.getETCDPathForResource(test.storageConfig.Prefix, "", "deployments", deploymentName, "default")) + if err != nil { + t.Fatalf("failed to read %s from etcd: %v", test.getETCDPathForResource(test.storageConfig.Prefix, "", "deployments", deploymentName, "default"), err) + } + + // check prefix + if !bytes.HasPrefix(rawEnvelope.Kvs[0].Value, []byte(wantPrefixForEncryptAll)) { + idx++ + + swapErr = fmt.Errorf("expected deployment to be prefixed with %s, but got %s", wantPrefixForEncryptAll, rawEnvelope.Kvs[0].Value) + + // return nil error to continue polling till timeout + return false, nil + } + } + return true, nil }) if pollErr == wait.ErrWaitTimeout {