From 3cbecf218dfea2e99ae95310ac03406d1d87a072 Mon Sep 17 00:00:00 2001 From: Rita Zhang Date: Wed, 9 Aug 2023 12:28:01 -0700 Subject: [PATCH] kmsv2: add metric for DEK cache filled Signed-off-by: Rita Zhang --- .../value/encrypt/envelope/kmsv2/cache.go | 9 +- .../encrypt/envelope/kmsv2/cache_test.go | 6 +- .../value/encrypt/envelope/kmsv2/envelope.go | 4 +- .../encrypt/envelope/kmsv2/envelope_test.go | 92 ++++++++++++++++++- .../value/encrypt/envelope/metrics/metrics.go | 16 ++++ 5 files changed, 116 insertions(+), 11 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/cache.go b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/cache.go index c677f54b5ba..be7a2a7f1a6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/cache.go @@ -26,6 +26,7 @@ import ( utilcache "k8s.io/apimachinery/pkg/util/cache" "k8s.io/apiserver/pkg/storage/value" + "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics" "k8s.io/utils/clock" ) @@ -38,10 +39,11 @@ type simpleCache struct { ttl time.Duration // hashPool is a per cache pool of hash.Hash (to avoid allocations from building the Hash) // SHA-256 is used to prevent collisions - hashPool *sync.Pool + hashPool *sync.Pool + providerName string } -func newSimpleCache(clock clock.Clock, ttl time.Duration) *simpleCache { +func newSimpleCache(clock clock.Clock, ttl time.Duration, providerName string) *simpleCache { cache := utilcache.NewExpiringWithClock(clock) cache.AllowExpiredGet = true // for a given key, the value (the decryptTransformer) is always the same return &simpleCache{ @@ -52,6 +54,7 @@ func newSimpleCache(clock clock.Clock, ttl time.Duration) *simpleCache { return sha256.New() }, }, + providerName: providerName, } } @@ -73,6 +76,8 @@ func (c *simpleCache) set(key []byte, transformer value.Read) { panic("transformer must not be nil") } c.cache.Set(c.keyFunc(key), transformer, c.ttl) + // Add metrics for cache size + metrics.RecordDekSourceCacheSize(c.providerName, c.cache.Len()) } // keyFunc generates a string key by hashing the inputs. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/cache_test.go index c68fa540dad..1f686170eab 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/cache_test.go @@ -31,7 +31,7 @@ import ( func TestSimpleCacheSetError(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) - cache := newSimpleCache(fakeClock, time.Second) + cache := newSimpleCache(fakeClock, time.Second, "providerName") tests := []struct { name string @@ -64,7 +64,7 @@ func TestSimpleCacheSetError(t *testing.T) { func TestKeyFunc(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) - cache := newSimpleCache(fakeClock, time.Second) + cache := newSimpleCache(fakeClock, time.Second, "providerName") t.Run("AllocsPerRun test", func(t *testing.T) { key, err := generateKey(encryptedDEKSourceMaxSize) // simulate worst case EDEK @@ -99,7 +99,7 @@ func TestKeyFunc(t *testing.T) { func TestSimpleCache(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) - cache := newSimpleCache(fakeClock, 5*time.Second) + cache := newSimpleCache(fakeClock, 5*time.Second, "providerName") transformer := &envelopeTransformer{} wg := sync.WaitGroup{} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope.go b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope.go index 45d5db58b75..1367d8fd963 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope.go @@ -127,7 +127,7 @@ func newEnvelopeTransformerWithClock(envelopeService kmsservice.Service, provide envelopeService: envelopeService, providerName: providerName, stateFunc: stateFunc, - cache: newSimpleCache(clock, cacheTTL), + cache: newSimpleCache(clock, cacheTTL, providerName), } } @@ -208,7 +208,6 @@ func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byt // this prevents a cache miss every time the DEK rotates // this has the side benefit of causing the cache to perform a GC // TODO see if we can do this inside the stateFunc control loop - // TODO(aramase): Add metrics for cache size. t.cache.set(state.CacheKey, state.Transformer) requestInfo := getRequestInfoFromContext(ctx) @@ -250,7 +249,6 @@ func (t *envelopeTransformer) addTransformerForDecryption(cacheKey []byte, key [ if err != nil { return nil, err } - // TODO(aramase): Add metrics for cache size. t.cache.set(cacheKey, transformer) return transformer, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope_test.go b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope_test.go index 28346055090..dbe489e7749 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope_test.go @@ -28,6 +28,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -61,11 +62,11 @@ type testEnvelopeService struct { disabled bool keyVersion string ciphertext []byte - decryptCalls int + decryptCalls int32 } func (t *testEnvelopeService) Decrypt(ctx context.Context, uid string, req *kmsservice.DecryptRequest) ([]byte, error) { - t.decryptCalls++ + atomic.AddInt32(&t.decryptCalls, 1) if t.disabled { return nil, fmt.Errorf("Envelope service was disabled") } @@ -225,7 +226,7 @@ func TestEnvelopeCaching(t *testing.T) { } } } - if envelopeService.decryptCalls != tt.expectedDecryptCalls { + if int(envelopeService.decryptCalls) != tt.expectedDecryptCalls { t.Fatalf("expected %d decrypt calls, got %d", tt.expectedDecryptCalls, envelopeService.decryptCalls) } }) @@ -890,6 +891,91 @@ func TestEnvelopeMetrics(t *testing.T) { } } +// TestEnvelopeMetricsCache validates the correctness of the apiserver_envelope_encryption_dek_source_cache_size metric +// and asserts that all of the associated logic is go routine safe. +// 1. Multiple transformers are created, which should result in unique cache size for each provider +// 2. A transformer with known number of states was created to encrypt, then on restart, another transformer +// was created, which should result in expected number of cache keys for all the decryption calls for each +// state used previously for encryption. +func TestEnvelopeMetricsCache(t *testing.T) { + envelopeService := newTestEnvelopeService() + envelopeService.keyVersion = testKeyVersion + state, err := testStateFunc(testContext(t), envelopeService, clock.RealClock{}, randomBool())() + if err != nil { + t.Fatal(err) + } + ctx := testContext(t) + dataCtx := value.DefaultContext(testContextText) + provider1 := "one" + provider2 := "two" + numOfStates := 10 + + testCases := []struct { + desc string + metrics []string + want string + }{ + { + desc: "dek source cache size", + metrics: []string{ + "apiserver_envelope_encryption_dek_source_cache_size", + }, + want: fmt.Sprintf(` + # HELP apiserver_envelope_encryption_dek_source_cache_size [ALPHA] Number of records in data encryption key (DEK) source cache. On a restart, this value is an approximation of the number of decrypt RPC calls the server will make to the KMS plugin. + # TYPE apiserver_envelope_encryption_dek_source_cache_size gauge + apiserver_envelope_encryption_dek_source_cache_size{provider_name="%s"} %d + apiserver_envelope_encryption_dek_source_cache_size{provider_name="%s"} 1 + `, provider1, numOfStates, provider2), + }, + } + transformer1 := NewEnvelopeTransformer(envelopeService, provider1, func() (State, error) { + // return different states to ensure we get expected number of cache keys after restart on decryption + return testStateFunc(ctx, envelopeService, clock.RealClock{}, randomBool())() + }) + transformer2 := NewEnvelopeTransformer(envelopeService, provider2, func() (State, error) { return state, nil }) + // used for restart + transformer3 := NewEnvelopeTransformer(envelopeService, provider1, func() (State, error) { return state, nil }) + var transformedDatas [][]byte + for j := 0; j < numOfStates; j++ { + transformedData, err := transformer1.TransformToStorage(ctx, []byte(testText), dataCtx) + if err != nil { + t.Fatal(err) + } + transformedDatas = append(transformedDatas, transformedData) + } + + for _, tt := range testCases { + t.Run(tt.desc, func(t *testing.T) { + metrics.DekSourceCacheSize.Reset() + var wg sync.WaitGroup + wg.Add(2 * numOfStates) + for i := 0; i < numOfStates; i++ { + i := i + go func() { + defer wg.Done() + // mimick a restart, the server will make decrypt RPC calls to the KMS plugin + // check cache metrics for the decrypt / read flow, which should repopulate the cache + if _, _, err := transformer3.TransformFromStorage(ctx, transformedDatas[i], dataCtx); err != nil { + panic(err) + } + }() + go func() { + defer wg.Done() + // check cache metrics for the encrypt / write flow + _, err := transformer2.TransformToStorage(ctx, []byte(testText), dataCtx) + if err != nil { + panic(err) + } + }() + } + wg.Wait() + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.want), tt.metrics...); err != nil { + t.Fatal(err) + } + }) + } +} + var flagOnce sync.Once // support running `go test -count X` func TestEnvelopeLogging(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics/metrics.go index ff3903805d6..be0f6dde932 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics/metrics.go @@ -156,6 +156,17 @@ var ( }, []string{"provider_name", "error"}, ) + + DekSourceCacheSize = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dek_source_cache_size", + Help: "Number of records in data encryption key (DEK) source cache. On a restart, this value is an approximation of the number of decrypt RPC calls the server will make to the KMS plugin.", + StabilityLevel: metrics.ALPHA, + }, + []string{"provider_name"}, + ) ) var registerMetricsFunc sync.Once @@ -197,6 +208,7 @@ func RegisterMetrics() { } legacyregistry.MustRegister(dekCacheFillPercent) legacyregistry.MustRegister(dekCacheInterArrivals) + legacyregistry.MustRegister(DekSourceCacheSize) legacyregistry.MustRegister(KeyIDHashTotal) legacyregistry.MustRegister(KeyIDHashLastTimestampSeconds) legacyregistry.MustRegister(KeyIDHashStatusLastTimestampSeconds) @@ -255,6 +267,10 @@ func RecordDekCacheFillPercent(percent float64) { dekCacheFillPercent.Set(percent) } +func RecordDekSourceCacheSize(providerName string, size int) { + DekSourceCacheSize.WithLabelValues(providerName).Set(float64(size)) +} + // RecordKMSOperationLatency records the latency of KMS operation. func RecordKMSOperationLatency(providerName, methodName string, duration time.Duration, err error) { KMSOperationsLatencyMetric.WithLabelValues(providerName, methodName, getErrorCode(err)).Observe(duration.Seconds())