diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go index 8fb0ae76825..79243993939 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go @@ -376,13 +376,18 @@ func (h *kmsv2PluginProbe) rotateDEKOnKeyIDChange(ctx context.Context, statusKey ExpirationTimestamp: expirationTimestamp, CacheKey: cacheKey, }) - klog.V(6).InfoS("successfully rotated DEK", - "uid", uid, - "newKeyID", resp.KeyID, - "oldKeyID", state.KeyID, - "expirationTimestamp", expirationTimestamp.Format(time.RFC3339), - ) - return nil + + // it should be logically impossible for the new state to be invalid but check just in case + _, errGen = h.getCurrentState() + if errGen == nil { + klog.V(6).InfoS("successfully rotated DEK", + "uid", uid, + "newKeyID", resp.KeyID, + "oldKeyID", state.KeyID, + "expirationTimestamp", expirationTimestamp.Format(time.RFC3339), + ) + return nil + } } return fmt.Errorf("failed to rotate DEK uid=%q, errState=%v, errGen=%v, statusKeyID=%q, encryptKeyID=%q, stateKeyID=%q, expirationTimestamp=%s", @@ -710,44 +715,8 @@ func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfig // initialize state so that Load always works probe.state.Store(&envelopekmsv2.State{}) - runProbeCheckAndLog := func(ctx context.Context) error { - if err := probe.check(ctx); err != nil { - klog.VDepth(1, 2).ErrorS(err, "kms plugin failed health check probe", "name", kmsName) - return err - } - return nil - } + primeAndProbeKMSv2(ctx, probe, kmsName) - // on the happy path where the plugin is healthy and available on server start, - // prime keyID and DEK by running the check inline once (this also prevents unit tests from flaking) - // ignore the error here since we want to support the plugin starting up async with the API server - _ = runProbeCheckAndLog(ctx) - // make sure that the plugin's key ID is reasonably up-to-date - // also, make sure that our DEK is up-to-date to with said key ID (if it expires the server will fail all writes) - // if this background loop ever stops running, the server will become unfunctional after kmsv2PluginWriteDEKMaxTTL - go wait.PollUntilWithContext( - ctx, - kmsv2PluginHealthzPositiveInterval, - func(ctx context.Context) (bool, error) { - if err := runProbeCheckAndLog(ctx); err == nil { - return false, nil - } - - // TODO add integration test for quicker error poll on failure - // if we fail, block the outer polling and start a new quicker poll inline - // this limits the chance that our DEK expires during a transient failure - _ = wait.PollUntilWithContext( - ctx, - kmsv2PluginHealthzNegativeInterval, - func(ctx context.Context) (bool, error) { - return runProbeCheckAndLog(ctx) == nil, nil - }, - ) - - return false, nil - }) - - // using AES-GCM by default for encrypting data with KMSv2 transformer := storagevalue.PrefixTransformer{ Transformer: envelopekmsv2.NewEnvelopeTransformer(envelopeService, kmsName, probe.getCurrentState), Prefix: []byte(kmsTransformerPrefixV2 + kmsName + ":"), @@ -763,6 +732,56 @@ func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfig } } +func primeAndProbeKMSv2(ctx context.Context, probe *kmsv2PluginProbe, kmsName string) { + runProbeCheckAndLog := func(ctx context.Context, depth int) error { + if err := probe.check(ctx); err != nil { + klog.VDepth(1+depth, 2).ErrorS(err, "kms plugin failed health check probe", "name", kmsName) + return err + } + return nil + } + + blockAndProbeFastUntilSuccess := func(ctx context.Context) { + _ = wait.PollUntilWithContext( + ctx, + kmsv2PluginHealthzNegativeInterval, + func(ctx context.Context) (bool, error) { + return runProbeCheckAndLog(ctx, 1) == nil, nil + }, + ) + } + + // on the happy path where the plugin is healthy and available on server start, + // prime keyID and DEK by running the check inline once (this also prevents unit tests from flaking) + errPrime := runProbeCheckAndLog(ctx, 0) + + // if our initial attempt to prime failed, start trying to get to a valid state in the background ASAP + // this prevents a slow start when the external healthz checker is configured to ignore the KMS healthz endpoint + // since we want to support the plugin starting up async with the API server, this error is not fatal + if errPrime != nil { + go blockAndProbeFastUntilSuccess(ctx) // separate go routine to avoid blocking + } + + // make sure that the plugin's key ID is reasonably up-to-date + // also, make sure that our DEK is up-to-date to with said key ID (if it expires the server will fail all writes) + // if this background loop ever stops running, the server will become unfunctional after kmsv2PluginWriteDEKMaxTTL + go wait.PollUntilWithContext( + ctx, + kmsv2PluginHealthzPositiveInterval, + func(ctx context.Context) (bool, error) { + if err := runProbeCheckAndLog(ctx, 0); err == nil { + return false, nil + } + + // TODO add integration test for quicker error poll on failure + // if we fail, block the outer polling and start a new quicker poll inline + // this limits the chance that our DEK expires during a transient failure + blockAndProbeFastUntilSuccess(ctx) + + return false, nil + }) +} + func envelopePrefixTransformer(config *apiserverconfig.KMSConfiguration, envelopeService envelope.Service, prefix string) storagevalue.PrefixTransformer { baseTransformerFunc := func(block cipher.Block) (storagevalue.Transformer, error) { gcm, err := aestransformer.NewGCMTransformer(block) diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go index 003441da329..bac46f3f162 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go @@ -1703,7 +1703,7 @@ func Test_kmsv2PluginProbe_rotateDEKOnKeyIDChange(t *testing.T) { { name: "happy path, with previous state", service: &testKMSv2EnvelopeService{err: fmt.Errorf("broken")}, // not called - state: validState("2", now), + state: validState(t, "2", now), statusKeyID: "2", wantState: envelopekmsv2.State{ KeyID: "2", @@ -1716,7 +1716,7 @@ func Test_kmsv2PluginProbe_rotateDEKOnKeyIDChange(t *testing.T) { { name: "previous state expired but key ID matches", service: &testKMSv2EnvelopeService{err: fmt.Errorf("broken")}, // not called - state: validState("3", now.Add(-time.Hour)), + state: validState(t, "3", now.Add(-time.Hour)), statusKeyID: "3", wantState: envelopekmsv2.State{ KeyID: "3", @@ -1729,7 +1729,7 @@ func Test_kmsv2PluginProbe_rotateDEKOnKeyIDChange(t *testing.T) { { name: "previous state expired but key ID does not match", service: &testKMSv2EnvelopeService{keyID: "4"}, - state: validState("3", now.Add(-time.Hour)), + state: validState(t, "3", now.Add(-time.Hour)), statusKeyID: "4", wantState: envelopekmsv2.State{ KeyID: "4", @@ -1746,7 +1746,7 @@ func Test_kmsv2PluginProbe_rotateDEKOnKeyIDChange(t *testing.T) { { name: "service down but key ID does not match", service: &testKMSv2EnvelopeService{err: fmt.Errorf("broken")}, - state: validState("4", now.Add(7*time.Minute)), + state: validState(t, "4", now.Add(7*time.Minute)), statusKeyID: "5", wantState: envelopekmsv2.State{ KeyID: "4", @@ -1778,7 +1778,7 @@ func Test_kmsv2PluginProbe_rotateDEKOnKeyIDChange(t *testing.T) { { name: "invalid service response, with previous state", service: &testKMSv2EnvelopeService{keyID: "3", encryptAnnotations: map[string][]byte{"panda": nil}}, - state: validState("2", now), + state: validState(t, "2", now), statusKeyID: "3", wantState: envelopekmsv2.State{ KeyID: "2", @@ -1831,17 +1831,52 @@ func Test_kmsv2PluginProbe_rotateDEKOnKeyIDChange(t *testing.T) { if errString(err) != tt.wantErr { t.Errorf("rotateDEKOnKeyIDChange() error = %v, wantErr %v", err, tt.wantErr) } + + // if the old or new state is valid, we should be able to use it + if _, stateErr := h.getCurrentState(); stateErr == nil || err == nil { + transformer := envelopekmsv2.NewEnvelopeTransformer( + &testKMSv2EnvelopeService{err: fmt.Errorf("broken")}, // not called + "panda", + h.getCurrentState, + ) + + dataCtx := value.DefaultContext(sampleContextText) + originalText := []byte(sampleText) + + transformedData, err := transformer.TransformToStorage(ctx, originalText, dataCtx) + if err != nil { + t.Fatal(err) + } + + untransformedData, stale, err := transformer.TransformFromStorage(ctx, transformedData, dataCtx) + if err != nil { + t.Fatal(err) + } + if stale { + t.Error("unexpected stale data") + } + if !bytes.Equal(untransformedData, originalText) { + t.Fatalf("incorrect transformation, want: %v, got: %v", originalText, untransformedData) + } + } }) } } -func validState(keyID string, exp time.Time) envelopekmsv2.State { +func validState(t *testing.T, keyID string, exp time.Time) envelopekmsv2.State { + t.Helper() + + transformer, resp, cacheKey, err := envelopekmsv2.GenerateTransformer(testContext(t), "", &testKMSv2EnvelopeService{keyID: keyID}) + if err != nil { + t.Fatal(err) + } return envelopekmsv2.State{ - Transformer: &resourceTransformer{}, - EncryptedDEK: []byte{1}, - KeyID: keyID, + Transformer: transformer, + EncryptedDEK: resp.Ciphertext, + KeyID: resp.KeyID, + Annotations: resp.Annotations, ExpirationTimestamp: exp, - CacheKey: []byte{1}, + CacheKey: cacheKey, } }