mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
kmsv2: use status key ID to update staleness of encrypted data
Signed-off-by: Rita Zhang <rita.z.zhang@gmail.com>
This commit is contained in:
parent
7e0923899f
commit
510ac9b391
@ -36,6 +36,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
apiserverconfig "k8s.io/apiserver/pkg/apis/config"
|
||||
apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1"
|
||||
"k8s.io/apiserver/pkg/apis/config/validation"
|
||||
@ -48,6 +49,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
|
||||
"k8s.io/apiserver/pkg/storage/value/encrypt/secretbox"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/klog/v2"
|
||||
kmsservice "k8s.io/kms/service"
|
||||
)
|
||||
|
||||
@ -57,6 +59,7 @@ const (
|
||||
secretboxTransformerPrefixV1 = "k8s:enc:secretbox:v1:"
|
||||
kmsTransformerPrefixV1 = "k8s:enc:kms:v1:"
|
||||
kmsTransformerPrefixV2 = "k8s:enc:kms:v2:"
|
||||
kmsPluginHealthzInterval = 1 * time.Minute
|
||||
kmsPluginHealthzNegativeTTL = 3 * time.Second
|
||||
kmsPluginHealthzPositiveTTL = 20 * time.Second
|
||||
kmsAPIVersionV1 = "v1"
|
||||
@ -84,6 +87,7 @@ type kmsPluginProbe struct {
|
||||
}
|
||||
|
||||
type kmsv2PluginProbe struct {
|
||||
keyID atomic.Pointer[string]
|
||||
name string
|
||||
ttl time.Duration
|
||||
service kmsservice.Service
|
||||
@ -272,6 +276,10 @@ func (h *kmsv2PluginProbe) check(ctx context.Context) error {
|
||||
h.ttl = kmsPluginHealthzNegativeTTL
|
||||
return fmt.Errorf("failed to perform status section of the healthz check for KMS Provider %s, error: %w", h.name, err)
|
||||
}
|
||||
// we coast on the last valid key ID that we have observed
|
||||
if err := envelopekmsv2.ValidateKeyID(p.KeyID); err == nil {
|
||||
h.keyID.Store(&p.KeyID)
|
||||
}
|
||||
|
||||
if err := isKMSv2ProviderHealthy(h.name, p); err != nil {
|
||||
h.lastResponse = &kmsPluginHealthzResponse{err: err, received: time.Now()}
|
||||
@ -284,6 +292,15 @@ func (h *kmsv2PluginProbe) check(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// getCurrentKeyID returns the latest keyID from the last Status() call or err if keyID is empty
|
||||
func (h *kmsv2PluginProbe) getCurrentKeyID(ctx context.Context) (string, error) {
|
||||
keyID := *h.keyID.Load()
|
||||
if len(keyID) == 0 {
|
||||
return "", fmt.Errorf("got unexpected empty keyID")
|
||||
}
|
||||
return keyID, nil
|
||||
}
|
||||
|
||||
// isKMSv2ProviderHealthy checks if the KMSv2-Plugin is healthy.
|
||||
func isKMSv2ProviderHealthy(name string, response *kmsservice.StatusResponse) error {
|
||||
var errs []error
|
||||
@ -293,7 +310,7 @@ func isKMSv2ProviderHealthy(name string, response *kmsservice.StatusResponse) er
|
||||
if response.Version != envelopekmsv2.KMSAPIVersion {
|
||||
errs = append(errs, fmt.Errorf("expected KMSv2 API version %s, got %s", envelopekmsv2.KMSAPIVersion, response.Version))
|
||||
}
|
||||
if len(response.KeyID) == 0 {
|
||||
if err := envelopekmsv2.ValidateKeyID(response.KeyID); err != nil {
|
||||
errs = append(errs, fmt.Errorf("expected KMSv2 KeyID to be set, got %s", response.KeyID))
|
||||
}
|
||||
|
||||
@ -561,10 +578,24 @@ func kmsPrefixTransformer(ctx context.Context, config *apiserverconfig.KMSConfig
|
||||
l: &sync.Mutex{},
|
||||
lastResponse: &kmsPluginHealthzResponse{},
|
||||
}
|
||||
// initialize keyID so that Load always works
|
||||
keyID := ""
|
||||
probe.keyID.Store(&keyID)
|
||||
|
||||
// make sure that the plugin's key ID is reasonably up-to-date
|
||||
go wait.PollImmediateUntilWithContext(
|
||||
ctx,
|
||||
kmsPluginHealthzInterval,
|
||||
func(ctx context.Context) (bool, error) {
|
||||
if err := probe.check(ctx); err != nil {
|
||||
klog.V(2).ErrorS(err, "kms plugin failed health check probe", "name", kmsName)
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
|
||||
// using AES-GCM by default for encrypting data with KMSv2
|
||||
transformer := value.PrefixTransformer{
|
||||
Transformer: envelopekmsv2.NewEnvelopeTransformer(envelopeService, int(*config.CacheSize), aestransformer.NewGCMTransformer),
|
||||
Transformer: envelopekmsv2.NewEnvelopeTransformer(envelopeService, probe.getCurrentKeyID, int(*config.CacheSize), aestransformer.NewGCMTransformer),
|
||||
Prefix: []byte(kmsTransformerPrefixV2 + kmsName + ":"),
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
apiserverconfig "k8s.io/apiserver/pkg/apis/config"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
@ -475,6 +476,13 @@ func TestKMSMaxTimeout(t *testing.T) {
|
||||
func TestKMSPluginHealthz(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
|
||||
|
||||
kmsv2Probe := &kmsv2PluginProbe{
|
||||
name: "foo",
|
||||
ttl: 3 * time.Second,
|
||||
}
|
||||
keyID := "1"
|
||||
kmsv2Probe.keyID.Store(&keyID)
|
||||
|
||||
testCases := []struct {
|
||||
desc string
|
||||
config string
|
||||
@ -517,10 +525,7 @@ func TestKMSPluginHealthz(t *testing.T) {
|
||||
desc: "Install multiple healthz with v1 and v2",
|
||||
config: "testdata/valid-configs/kms/multiple-providers-kmsv2.yaml",
|
||||
want: []healthChecker{
|
||||
&kmsv2PluginProbe{
|
||||
name: "foo",
|
||||
ttl: 3 * time.Second,
|
||||
},
|
||||
kmsv2Probe,
|
||||
&kmsPluginProbe{
|
||||
name: "bar",
|
||||
ttl: 3 * time.Second,
|
||||
@ -547,7 +552,9 @@ func TestKMSPluginHealthz(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
_, got, kmsUsed, err := getTransformerOverridesAndKMSPluginProbes(testContext(t), config)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel() // cancel this upfront so the kms v2 healthz check poll only runs once
|
||||
_, got, kmsUsed, err := getTransformerOverridesAndKMSPluginProbes(ctx, config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -561,9 +568,11 @@ func TestKMSPluginHealthz(t *testing.T) {
|
||||
p.l = nil
|
||||
p.lastResponse = nil
|
||||
case *kmsv2PluginProbe:
|
||||
waitForOneKMSv2Check(t, p) // make sure the kms v2 healthz check poll is done
|
||||
p.service = nil
|
||||
p.l = nil
|
||||
p.lastResponse = nil
|
||||
p.keyID = kmsv2Probe.keyID
|
||||
default:
|
||||
t.Fatalf("unexpected probe type %T", p)
|
||||
}
|
||||
@ -590,6 +599,18 @@ func TestKMSPluginHealthz(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func waitForOneKMSv2Check(t *testing.T, p *kmsv2PluginProbe) {
|
||||
t.Helper()
|
||||
|
||||
if err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
|
||||
p.l.Lock()
|
||||
defer p.l.Unlock()
|
||||
return !p.lastResponse.received.IsZero(), nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKMSPluginHealthzTTL(t *testing.T) {
|
||||
ctx := testContext(t)
|
||||
|
||||
|
@ -50,9 +50,13 @@ const (
|
||||
encryptedDEKMaxSize = 1 * 1024 // 1 kB
|
||||
)
|
||||
|
||||
type KeyIDGetterFunc func(context.Context) (keyID string, err error)
|
||||
|
||||
type envelopeTransformer struct {
|
||||
envelopeService kmsservice.Service
|
||||
|
||||
keyIDGetter KeyIDGetterFunc
|
||||
|
||||
// transformers is a thread-safe LRU cache which caches decrypted DEKs indexed by their encrypted form.
|
||||
transformers *lru.Cache
|
||||
|
||||
@ -67,7 +71,7 @@ type envelopeTransformer struct {
|
||||
// It uses envelopeService to encrypt and decrypt DEKs. Respective DEKs (in encrypted form) are prepended to
|
||||
// the data items they encrypt. A cache (of size cacheSize) is maintained to store the most recently
|
||||
// used decrypted DEKs in memory.
|
||||
func NewEnvelopeTransformer(envelopeService kmsservice.Service, cacheSize int, baseTransformerFunc func(cipher.Block) value.Transformer) value.Transformer {
|
||||
func NewEnvelopeTransformer(envelopeService kmsservice.Service, keyIDGetter KeyIDGetterFunc, cacheSize int, baseTransformerFunc func(cipher.Block) value.Transformer) value.Transformer {
|
||||
var cache *lru.Cache
|
||||
|
||||
if cacheSize > 0 {
|
||||
@ -78,6 +82,7 @@ func NewEnvelopeTransformer(envelopeService kmsservice.Service, cacheSize int, b
|
||||
|
||||
return &envelopeTransformer{
|
||||
envelopeService: envelopeService,
|
||||
keyIDGetter: keyIDGetter,
|
||||
transformers: cache,
|
||||
baseTransformerFunc: baseTransformerFunc,
|
||||
cacheEnabled: cacheSize > 0,
|
||||
@ -118,7 +123,21 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
|
||||
}
|
||||
}
|
||||
|
||||
return transformer.TransformFromStorage(ctx, encryptedObject.EncryptedData, dataCtx)
|
||||
out, stale, err := transformer.TransformFromStorage(ctx, encryptedObject.EncryptedData, dataCtx)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if stale {
|
||||
return out, stale, nil
|
||||
}
|
||||
|
||||
// Check keyID freshness in addition to data staleness
|
||||
keyID, err := t.keyIDGetter(ctx)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return out, encryptedObject.KeyID != keyID, nil
|
||||
|
||||
}
|
||||
|
||||
// TransformToStorage encrypts data to be written to disk using envelope encryption.
|
||||
@ -130,7 +149,7 @@ func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byt
|
||||
}
|
||||
|
||||
uid := string(uuid.NewUUID())
|
||||
klog.V(6).InfoS("Encrypting content using envelope service", "uid", uid, "key", string(dataCtx.AuthenticatedData()))
|
||||
klog.V(6).InfoS("encrypting content using envelope service", "uid", uid, "key", string(dataCtx.AuthenticatedData()))
|
||||
resp, err := t.envelopeService.Encrypt(ctx, uid, newKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to encrypt DEK, error: %w", err)
|
||||
@ -153,6 +172,12 @@ func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byt
|
||||
Annotations: resp.Annotations,
|
||||
}
|
||||
|
||||
// Check keyID freshness and write to log if key IDs are different
|
||||
statusKeyID, err := t.keyIDGetter(ctx)
|
||||
if err == nil && encObject.KeyID != statusKeyID {
|
||||
klog.V(2).InfoS("observed different key IDs when encrypting content using kms v2 envelope service", "uid", uid, "encObject.KeyID", encObject.KeyID, "statusKeyID", statusKeyID)
|
||||
}
|
||||
|
||||
// Serialize the EncryptedObject to a byte array.
|
||||
return t.doEncode(encObject)
|
||||
}
|
||||
@ -231,7 +256,7 @@ func validateEncryptedObject(o *kmstypes.EncryptedObject) error {
|
||||
if err := validateEncryptedDEK(o.EncryptedDEK); err != nil {
|
||||
return fmt.Errorf("failed to validate encrypted DEK: %w", err)
|
||||
}
|
||||
if err := validateKeyID(o.KeyID); err != nil {
|
||||
if err := ValidateKeyID(o.KeyID); err != nil {
|
||||
return fmt.Errorf("failed to validate key id: %w", err)
|
||||
}
|
||||
if err := validateAnnotations(o.Annotations); err != nil {
|
||||
@ -271,10 +296,10 @@ func validateAnnotations(annotations map[string][]byte) error {
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
// validateKeyID tests the following:
|
||||
// ValidateKeyID tests the following:
|
||||
// 1. The keyID is not empty.
|
||||
// 2. The size of keyID is less than 1 kB.
|
||||
func validateKeyID(keyID string) error {
|
||||
func ValidateKeyID(keyID string) error {
|
||||
if len(keyID) == 0 {
|
||||
return fmt.Errorf("keyID is empty")
|
||||
}
|
||||
|
@ -37,6 +37,7 @@ const (
|
||||
testText = "abcdefghijklmnopqrstuvwxyz"
|
||||
testContextText = "0123456789"
|
||||
testEnvelopeCacheSize = 10
|
||||
testKeyVersion = "1"
|
||||
)
|
||||
|
||||
// testEnvelopeService is a mock Envelope service which can be used to simulate remote Envelope services
|
||||
@ -100,7 +101,7 @@ func (t *testEnvelopeService) Rotate() {
|
||||
|
||||
func newTestEnvelopeService() *testEnvelopeService {
|
||||
return &testEnvelopeService{
|
||||
keyVersion: "1",
|
||||
keyVersion: testKeyVersion,
|
||||
}
|
||||
}
|
||||
|
||||
@ -133,7 +134,11 @@ func TestEnvelopeCaching(t *testing.T) {
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.desc, func(t *testing.T) {
|
||||
envelopeService := newTestEnvelopeService()
|
||||
envelopeTransformer := NewEnvelopeTransformer(envelopeService, tt.cacheSize, aestransformer.NewGCMTransformer)
|
||||
envelopeTransformer := NewEnvelopeTransformer(envelopeService,
|
||||
func(ctx context.Context) (string, error) {
|
||||
return "", nil
|
||||
},
|
||||
tt.cacheSize, aestransformer.NewGCMTransformer)
|
||||
ctx := context.Background()
|
||||
dataCtx := value.DefaultContext([]byte(testContextText))
|
||||
originalText := []byte(testText)
|
||||
@ -173,7 +178,12 @@ func TestEnvelopeCaching(t *testing.T) {
|
||||
|
||||
// Makes Envelope transformer hit cache limit, throws error if it misbehaves.
|
||||
func TestEnvelopeCacheLimit(t *testing.T) {
|
||||
envelopeTransformer := NewEnvelopeTransformer(newTestEnvelopeService(), testEnvelopeCacheSize, aestransformer.NewGCMTransformer)
|
||||
envelopeTransformer := NewEnvelopeTransformer(newTestEnvelopeService(),
|
||||
func(ctx context.Context) (string, error) {
|
||||
return "", nil
|
||||
},
|
||||
testEnvelopeCacheSize, aestransformer.NewGCMTransformer)
|
||||
|
||||
ctx := context.Background()
|
||||
dataCtx := value.DefaultContext([]byte(testContextText))
|
||||
|
||||
@ -205,6 +215,75 @@ func TestEnvelopeCacheLimit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test keyIDGetter as part of envelopeTransformer, throws error if returned err or staleness is incorrect.
|
||||
func TestEnvelopeTransformerKeyIDGetter(t *testing.T) {
|
||||
t.Parallel()
|
||||
testCases := []struct {
|
||||
desc string
|
||||
expectedStale bool
|
||||
testErr error
|
||||
testKeyID string
|
||||
}{
|
||||
{
|
||||
desc: "keyIDGetter returns err",
|
||||
expectedStale: false,
|
||||
testErr: fmt.Errorf("failed to perform status section of the healthz check for KMS Provider"),
|
||||
testKeyID: "",
|
||||
},
|
||||
{
|
||||
desc: "keyIDGetter returns same keyID",
|
||||
expectedStale: false,
|
||||
testErr: nil,
|
||||
testKeyID: testKeyVersion,
|
||||
},
|
||||
{
|
||||
desc: "keyIDGetter returns different keyID",
|
||||
expectedStale: true,
|
||||
testErr: nil,
|
||||
testKeyID: "2",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range testCases {
|
||||
tt := tt
|
||||
t.Run(tt.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
envelopeService := newTestEnvelopeService()
|
||||
envelopeTransformer := NewEnvelopeTransformer(envelopeService,
|
||||
func(ctx context.Context) (string, error) {
|
||||
return tt.testKeyID, tt.testErr
|
||||
},
|
||||
0, aestransformer.NewGCMTransformer)
|
||||
|
||||
ctx := context.Background()
|
||||
dataCtx := value.DefaultContext([]byte(testContextText))
|
||||
originalText := []byte(testText)
|
||||
|
||||
transformedData, err := envelopeTransformer.TransformToStorage(ctx, originalText, dataCtx)
|
||||
if err != nil {
|
||||
t.Fatalf("envelopeTransformer: error while transforming data (%v) to storage: %s", originalText, err)
|
||||
}
|
||||
|
||||
_, stale, err := envelopeTransformer.TransformFromStorage(ctx, transformedData, dataCtx)
|
||||
if tt.testErr != nil {
|
||||
if err == nil {
|
||||
t.Fatalf("envelopeTransformer: expected error: %v, got nil", tt.testErr)
|
||||
}
|
||||
if err.Error() != tt.testErr.Error() {
|
||||
t.Fatalf("envelopeTransformer: expected error: %v, got: %v", tt.testErr, err)
|
||||
}
|
||||
} else {
|
||||
if err != nil {
|
||||
t.Fatalf("envelopeTransformer: unexpected error: %v", err)
|
||||
}
|
||||
if stale != tt.expectedStale {
|
||||
t.Fatalf("envelopeTransformer TransformFromStorage determined keyID staleness incorrectly, expected: %v, got %v", tt.expectedStale, stale)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransformToStorageError(t *testing.T) {
|
||||
t.Parallel()
|
||||
testCases := []struct {
|
||||
@ -238,7 +317,11 @@ func TestTransformToStorageError(t *testing.T) {
|
||||
t.Parallel()
|
||||
envelopeService := newTestEnvelopeService()
|
||||
envelopeService.SetAnnotations(tt.annotations)
|
||||
envelopeTransformer := NewEnvelopeTransformer(envelopeService, 0, aestransformer.NewGCMTransformer)
|
||||
envelopeTransformer := NewEnvelopeTransformer(envelopeService,
|
||||
func(ctx context.Context) (string, error) {
|
||||
return "", nil
|
||||
},
|
||||
0, aestransformer.NewGCMTransformer)
|
||||
ctx := context.Background()
|
||||
dataCtx := value.DefaultContext([]byte(testContextText))
|
||||
|
||||
@ -445,7 +528,7 @@ func TestValidateKeyID(t *testing.T) {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
err := validateKeyID(tt.keyID)
|
||||
err := ValidateKeyID(tt.keyID)
|
||||
if tt.expectedError != "" {
|
||||
if err == nil {
|
||||
t.Fatalf("expected error %q, got nil", tt.expectedError)
|
||||
|
@ -57,6 +57,7 @@ type Base64Plugin struct {
|
||||
inFailedState bool
|
||||
ver string
|
||||
socketPath string
|
||||
keyID string
|
||||
}
|
||||
|
||||
// NewBase64Plugin is a constructor for Base64Plugin.
|
||||
@ -67,6 +68,7 @@ func NewBase64Plugin(socketPath string) (*Base64Plugin, error) {
|
||||
mu: &sync.Mutex{},
|
||||
ver: kmsapiVersion,
|
||||
socketPath: socketPath,
|
||||
keyID: "1",
|
||||
}
|
||||
|
||||
kmsapi.RegisterKeyManagementServiceServer(server, result)
|
||||
@ -89,6 +91,24 @@ func WaitForBase64PluginToBeUp(plugin *Base64Plugin) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForBase64PluginToBeUpdated waits until the plugin updates keyID.
|
||||
func WaitForBase64PluginToBeUpdated(plugin *Base64Plugin) error {
|
||||
var gRPCErr error
|
||||
var resp *kmsapi.StatusResponse
|
||||
|
||||
updatePollErr := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
resp, gRPCErr = plugin.Status(context.Background(), &kmsapi.StatusRequest{})
|
||||
klog.InfoS("WaitForBase64PluginToBeUpdated", "keyID", resp.KeyId)
|
||||
return gRPCErr == nil && resp.Healthz == "ok" && resp.KeyId == "2", nil
|
||||
})
|
||||
|
||||
if updatePollErr != nil {
|
||||
return fmt.Errorf("failed to update keyID for kmsv2-plugin, error: %w", gRPCErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// LastEncryptRequest returns the last EncryptRequest.Plain sent to the plugin.
|
||||
func (s *Base64Plugin) LastEncryptRequest() []byte {
|
||||
return s.lastEncryptRequest.Plaintext
|
||||
@ -135,6 +155,14 @@ func (s *Base64Plugin) ExitFailedState() {
|
||||
s.inFailedState = false
|
||||
}
|
||||
|
||||
// Update keyID for the plugin.
|
||||
func (s *Base64Plugin) UpdateKeyID() {
|
||||
klog.Infof("updating keyID")
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.keyID = "2"
|
||||
}
|
||||
|
||||
// Status returns the status of the kms-plugin.
|
||||
func (s *Base64Plugin) Status(ctx context.Context, request *kmsapi.StatusRequest) (*kmsapi.StatusResponse, error) {
|
||||
klog.Infof("Received request for Status: %v", request)
|
||||
@ -145,7 +173,7 @@ func (s *Base64Plugin) Status(ctx context.Context, request *kmsapi.StatusRequest
|
||||
return nil, status.Error(codes.FailedPrecondition, "failed precondition - key disabled")
|
||||
}
|
||||
|
||||
return &kmsapi.StatusResponse{Version: s.ver, Healthz: "ok", KeyId: "1"}, nil
|
||||
return &kmsapi.StatusResponse{Version: s.ver, Healthz: "ok", KeyId: s.keyID}, nil
|
||||
}
|
||||
|
||||
// Decrypt performs base64 decoding of the payload of kms.DecryptRequest.
|
||||
@ -187,5 +215,5 @@ func (s *Base64Plugin) Encrypt(ctx context.Context, request *kmsapi.EncryptReque
|
||||
buf := make([]byte, base64.StdEncoding.EncodedLen(len(request.Plaintext)))
|
||||
base64.StdEncoding.Encode(buf, request.Plaintext)
|
||||
|
||||
return &kmsapi.EncryptResponse{Ciphertext: buf, KeyId: "1", Annotations: map[string][]byte{"local-kek.kms.kubernetes.io": []byte("encrypted-local-kek")}}, nil
|
||||
return &kmsapi.EncryptResponse{Ciphertext: buf, KeyId: s.keyID, Annotations: map[string][]byte{"local-kek.kms.kubernetes.io": []byte("encrypted-local-kek")}}, nil
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
@ -208,6 +209,121 @@ resources:
|
||||
}
|
||||
}
|
||||
|
||||
// TestKMSv2ProviderKeyIDStaleness is an integration test between KubeAPI and KMSv2 Plugin
|
||||
// Concretely, this test verifies the following contracts for no-op updates:
|
||||
// 1. When the key ID is unchanged, the resource version must not change
|
||||
// 2. When the key ID changes, the resource version changes (but only once)
|
||||
// 3. For all subsequent updates, the resource version must not change
|
||||
// 4. When kms-plugin is down, expect creation of new pod and encryption to fail
|
||||
// 5. when kms-plugin is down, no-op update for a pod should succeed and not result in RV change
|
||||
func TestKMSv2ProviderKeyIDStaleness(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
|
||||
|
||||
encryptionConfig := `
|
||||
kind: EncryptionConfiguration
|
||||
apiVersion: apiserver.config.k8s.io/v1
|
||||
resources:
|
||||
- resources:
|
||||
- pods
|
||||
providers:
|
||||
- kms:
|
||||
apiVersion: v2
|
||||
name: kms-provider
|
||||
cachesize: 1000
|
||||
endpoint: unix:///@kms-provider.sock
|
||||
`
|
||||
pluginMock, err := kmsv2mock.NewBase64Plugin("@kms-provider.sock")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create mock of KMSv2 Plugin: %v", err)
|
||||
}
|
||||
|
||||
go pluginMock.Start()
|
||||
if err := kmsv2mock.WaitForBase64PluginToBeUp(pluginMock); err != nil {
|
||||
t.Fatalf("Failed start plugin, err: %v", err)
|
||||
}
|
||||
defer pluginMock.CleanUp()
|
||||
|
||||
test, err := newTransformTest(t, encryptionConfig, false, "", false)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to start KUBE API Server with encryptionConfig\n %s, error: %v", encryptionConfig, err)
|
||||
}
|
||||
defer test.cleanUp()
|
||||
|
||||
testPod, err := test.createPod(testNamespace, dynamic.NewForConfigOrDie(test.kubeAPIServer.ClientConfig))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test pod, error: %v, ns: %s", err, testNamespace)
|
||||
}
|
||||
version1 := testPod.GetResourceVersion()
|
||||
|
||||
// 1. no-op update for the test pod should not result in any RV change
|
||||
updatedPod, err := test.inplaceUpdatePod(testNamespace, testPod, dynamic.NewForConfigOrDie(test.kubeAPIServer.ClientConfig))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to update test pod, error: %v, ns: %s", err, testNamespace)
|
||||
}
|
||||
version2 := updatedPod.GetResourceVersion()
|
||||
if version1 != version2 {
|
||||
t.Fatalf("Resource version should not have changed. old pod: %v, new pod: %v", testPod, updatedPod)
|
||||
}
|
||||
// 2. no-op update for the test pod with keyID update should result in RV change
|
||||
pluginMock.UpdateKeyID()
|
||||
if err := kmsv2mock.WaitForBase64PluginToBeUpdated(pluginMock); err != nil {
|
||||
t.Fatalf("Failed to update keyID for plugin, err: %v", err)
|
||||
}
|
||||
// Wait 1 sec (poll interval to check resource version) until a resource version change is detected or timeout at 1 minute.
|
||||
|
||||
version3 := ""
|
||||
err = wait.Poll(time.Second, time.Minute,
|
||||
func() (bool, error) {
|
||||
updatedPod, err = test.inplaceUpdatePod(testNamespace, updatedPod, dynamic.NewForConfigOrDie(test.kubeAPIServer.ClientConfig))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
version3 = updatedPod.GetResourceVersion()
|
||||
if version1 != version3 {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to detect one resource version update within the allotted time after keyID is updated and pod has been inplace updated, err: %v, ns: %s", err, testNamespace)
|
||||
}
|
||||
|
||||
if version1 == version3 {
|
||||
t.Fatalf("Resource version should have changed after keyID update. old pod: %v, new pod: %v", testPod, updatedPod)
|
||||
}
|
||||
|
||||
// 3. no-op update for the updated pod should not result in RV change
|
||||
updatedPod, err = test.inplaceUpdatePod(testNamespace, updatedPod, dynamic.NewForConfigOrDie(test.kubeAPIServer.ClientConfig))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to update test pod, error: %v, ns: %s", err, testNamespace)
|
||||
}
|
||||
version4 := updatedPod.GetResourceVersion()
|
||||
if version3 != version4 {
|
||||
t.Fatalf("Resource version should not have changed again after the initial version updated as a result of the keyID update. old pod: %v, new pod: %v", testPod, updatedPod)
|
||||
}
|
||||
|
||||
// 4. when kms-plugin is down, expect creation of new pod and encryption to fail
|
||||
pluginMock.EnterFailedState()
|
||||
mustBeUnHealthy(t, "/kms-providers",
|
||||
"internal server error: kms-provider-0: rpc error: code = FailedPrecondition desc = failed precondition - key disabled",
|
||||
test.kubeAPIServer.ClientConfig)
|
||||
|
||||
_, err = test.createPod(testNamespace, dynamic.NewForConfigOrDie(test.kubeAPIServer.ClientConfig))
|
||||
if err == nil || !strings.Contains(err.Error(), "failed to encrypt") {
|
||||
t.Fatalf("Create test pod should have failed due to encryption, ns: %s", testNamespace)
|
||||
}
|
||||
|
||||
// 5. when kms-plugin is down, no-op update for a pod should succeed and not result in RV change
|
||||
updatedPod, err = test.inplaceUpdatePod(testNamespace, updatedPod, dynamic.NewForConfigOrDie(test.kubeAPIServer.ClientConfig))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to perform no-op update on pod when kms-plugin is down, error: %v, ns: %s", err, testNamespace)
|
||||
}
|
||||
version5 := updatedPod.GetResourceVersion()
|
||||
if version3 != version5 {
|
||||
t.Fatalf("Resource version should not have changed again after the initial version updated as a result of the keyID update. old pod: %v, new pod: %v", testPod, updatedPod)
|
||||
}
|
||||
}
|
||||
|
||||
func TestKMSv2Healthz(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
|
||||
|
||||
|
@ -368,6 +368,10 @@ func createResource(client dynamic.Interface, gvr schema.GroupVersionResource, n
|
||||
return client.Resource(gvr).Namespace(ns).Create(context.TODO(), stubObj, metav1.CreateOptions{})
|
||||
}
|
||||
|
||||
func inplaceUpdateResource(client dynamic.Interface, gvr schema.GroupVersionResource, ns string, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
|
||||
return client.Resource(gvr).Namespace(ns).Update(context.TODO(), obj, metav1.UpdateOptions{})
|
||||
}
|
||||
|
||||
func getStubObj(gvr schema.GroupVersionResource) (*unstructured.Unstructured, error) {
|
||||
stub := ""
|
||||
if data, ok := etcd.GetEtcdStorageDataForNamespace(testNamespace)[gvr]; ok {
|
||||
@ -393,6 +397,15 @@ func (e *transformTest) createPod(namespace string, dynamicInterface dynamic.Int
|
||||
return pod, nil
|
||||
}
|
||||
|
||||
func (e *transformTest) inplaceUpdatePod(namespace string, obj *unstructured.Unstructured, dynamicInterface dynamic.Interface) (*unstructured.Unstructured, error) {
|
||||
podGVR := gvr("", "v1", "pods")
|
||||
pod, err := inplaceUpdateResource(dynamicInterface, podGVR, namespace, obj)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error while writing pod: %v", err)
|
||||
}
|
||||
return pod, nil
|
||||
}
|
||||
|
||||
func (e *transformTest) readRawRecordFromETCD(path string) (*clientv3.GetResponse, error) {
|
||||
rawClient, etcdClient, err := integration.GetEtcdClients(e.kubeAPIServer.ServerOpts.Etcd.StorageConfig.Transport)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user