mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-07 03:03:59 +00:00
Merge pull request #119878 from ritazh/kmsv2-metrics-dekcachesize
kmsv2: add metric for DEK cache filled
This commit is contained in:
commit
16310c959d
@ -26,6 +26,7 @@ import (
|
||||
|
||||
utilcache "k8s.io/apimachinery/pkg/util/cache"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
@ -38,10 +39,11 @@ type simpleCache struct {
|
||||
ttl time.Duration
|
||||
// hashPool is a per cache pool of hash.Hash (to avoid allocations from building the Hash)
|
||||
// SHA-256 is used to prevent collisions
|
||||
hashPool *sync.Pool
|
||||
hashPool *sync.Pool
|
||||
providerName string
|
||||
}
|
||||
|
||||
func newSimpleCache(clock clock.Clock, ttl time.Duration) *simpleCache {
|
||||
func newSimpleCache(clock clock.Clock, ttl time.Duration, providerName string) *simpleCache {
|
||||
cache := utilcache.NewExpiringWithClock(clock)
|
||||
cache.AllowExpiredGet = true // for a given key, the value (the decryptTransformer) is always the same
|
||||
return &simpleCache{
|
||||
@ -52,6 +54,7 @@ func newSimpleCache(clock clock.Clock, ttl time.Duration) *simpleCache {
|
||||
return sha256.New()
|
||||
},
|
||||
},
|
||||
providerName: providerName,
|
||||
}
|
||||
}
|
||||
|
||||
@ -73,6 +76,8 @@ func (c *simpleCache) set(key []byte, transformer value.Read) {
|
||||
panic("transformer must not be nil")
|
||||
}
|
||||
c.cache.Set(c.keyFunc(key), transformer, c.ttl)
|
||||
// Add metrics for cache size
|
||||
metrics.RecordDekSourceCacheSize(c.providerName, c.cache.Len())
|
||||
}
|
||||
|
||||
// keyFunc generates a string key by hashing the inputs.
|
||||
|
@ -31,7 +31,7 @@ import (
|
||||
|
||||
func TestSimpleCacheSetError(t *testing.T) {
|
||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||
cache := newSimpleCache(fakeClock, time.Second)
|
||||
cache := newSimpleCache(fakeClock, time.Second, "providerName")
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
@ -64,7 +64,7 @@ func TestSimpleCacheSetError(t *testing.T) {
|
||||
|
||||
func TestKeyFunc(t *testing.T) {
|
||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||
cache := newSimpleCache(fakeClock, time.Second)
|
||||
cache := newSimpleCache(fakeClock, time.Second, "providerName")
|
||||
|
||||
t.Run("AllocsPerRun test", func(t *testing.T) {
|
||||
key, err := generateKey(encryptedDEKSourceMaxSize) // simulate worst case EDEK
|
||||
@ -99,7 +99,7 @@ func TestKeyFunc(t *testing.T) {
|
||||
|
||||
func TestSimpleCache(t *testing.T) {
|
||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
||||
cache := newSimpleCache(fakeClock, 5*time.Second)
|
||||
cache := newSimpleCache(fakeClock, 5*time.Second, "providerName")
|
||||
transformer := &envelopeTransformer{}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
|
@ -127,7 +127,7 @@ func newEnvelopeTransformerWithClock(envelopeService kmsservice.Service, provide
|
||||
envelopeService: envelopeService,
|
||||
providerName: providerName,
|
||||
stateFunc: stateFunc,
|
||||
cache: newSimpleCache(clock, cacheTTL),
|
||||
cache: newSimpleCache(clock, cacheTTL, providerName),
|
||||
}
|
||||
}
|
||||
|
||||
@ -208,7 +208,6 @@ func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byt
|
||||
// this prevents a cache miss every time the DEK rotates
|
||||
// this has the side benefit of causing the cache to perform a GC
|
||||
// TODO see if we can do this inside the stateFunc control loop
|
||||
// TODO(aramase): Add metrics for cache size.
|
||||
t.cache.set(state.CacheKey, state.Transformer)
|
||||
|
||||
requestInfo := getRequestInfoFromContext(ctx)
|
||||
@ -250,7 +249,6 @@ func (t *envelopeTransformer) addTransformerForDecryption(cacheKey []byte, key [
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO(aramase): Add metrics for cache size.
|
||||
t.cache.set(cacheKey, transformer)
|
||||
return transformer, nil
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -61,11 +62,11 @@ type testEnvelopeService struct {
|
||||
disabled bool
|
||||
keyVersion string
|
||||
ciphertext []byte
|
||||
decryptCalls int
|
||||
decryptCalls int32
|
||||
}
|
||||
|
||||
func (t *testEnvelopeService) Decrypt(ctx context.Context, uid string, req *kmsservice.DecryptRequest) ([]byte, error) {
|
||||
t.decryptCalls++
|
||||
atomic.AddInt32(&t.decryptCalls, 1)
|
||||
if t.disabled {
|
||||
return nil, fmt.Errorf("Envelope service was disabled")
|
||||
}
|
||||
@ -225,7 +226,7 @@ func TestEnvelopeCaching(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if envelopeService.decryptCalls != tt.expectedDecryptCalls {
|
||||
if int(envelopeService.decryptCalls) != tt.expectedDecryptCalls {
|
||||
t.Fatalf("expected %d decrypt calls, got %d", tt.expectedDecryptCalls, envelopeService.decryptCalls)
|
||||
}
|
||||
})
|
||||
@ -890,6 +891,91 @@ func TestEnvelopeMetrics(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnvelopeMetricsCache validates the correctness of the apiserver_envelope_encryption_dek_source_cache_size metric
|
||||
// and asserts that all of the associated logic is go routine safe.
|
||||
// 1. Multiple transformers are created, which should result in unique cache size for each provider
|
||||
// 2. A transformer with known number of states was created to encrypt, then on restart, another transformer
|
||||
// was created, which should result in expected number of cache keys for all the decryption calls for each
|
||||
// state used previously for encryption.
|
||||
func TestEnvelopeMetricsCache(t *testing.T) {
|
||||
envelopeService := newTestEnvelopeService()
|
||||
envelopeService.keyVersion = testKeyVersion
|
||||
state, err := testStateFunc(testContext(t), envelopeService, clock.RealClock{}, randomBool())()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ctx := testContext(t)
|
||||
dataCtx := value.DefaultContext(testContextText)
|
||||
provider1 := "one"
|
||||
provider2 := "two"
|
||||
numOfStates := 10
|
||||
|
||||
testCases := []struct {
|
||||
desc string
|
||||
metrics []string
|
||||
want string
|
||||
}{
|
||||
{
|
||||
desc: "dek source cache size",
|
||||
metrics: []string{
|
||||
"apiserver_envelope_encryption_dek_source_cache_size",
|
||||
},
|
||||
want: fmt.Sprintf(`
|
||||
# HELP apiserver_envelope_encryption_dek_source_cache_size [ALPHA] Number of records in data encryption key (DEK) source cache. On a restart, this value is an approximation of the number of decrypt RPC calls the server will make to the KMS plugin.
|
||||
# TYPE apiserver_envelope_encryption_dek_source_cache_size gauge
|
||||
apiserver_envelope_encryption_dek_source_cache_size{provider_name="%s"} %d
|
||||
apiserver_envelope_encryption_dek_source_cache_size{provider_name="%s"} 1
|
||||
`, provider1, numOfStates, provider2),
|
||||
},
|
||||
}
|
||||
transformer1 := NewEnvelopeTransformer(envelopeService, provider1, func() (State, error) {
|
||||
// return different states to ensure we get expected number of cache keys after restart on decryption
|
||||
return testStateFunc(ctx, envelopeService, clock.RealClock{}, randomBool())()
|
||||
})
|
||||
transformer2 := NewEnvelopeTransformer(envelopeService, provider2, func() (State, error) { return state, nil })
|
||||
// used for restart
|
||||
transformer3 := NewEnvelopeTransformer(envelopeService, provider1, func() (State, error) { return state, nil })
|
||||
var transformedDatas [][]byte
|
||||
for j := 0; j < numOfStates; j++ {
|
||||
transformedData, err := transformer1.TransformToStorage(ctx, []byte(testText), dataCtx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
transformedDatas = append(transformedDatas, transformedData)
|
||||
}
|
||||
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.desc, func(t *testing.T) {
|
||||
metrics.DekSourceCacheSize.Reset()
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2 * numOfStates)
|
||||
for i := 0; i < numOfStates; i++ {
|
||||
i := i
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// mimick a restart, the server will make decrypt RPC calls to the KMS plugin
|
||||
// check cache metrics for the decrypt / read flow, which should repopulate the cache
|
||||
if _, _, err := transformer3.TransformFromStorage(ctx, transformedDatas[i], dataCtx); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// check cache metrics for the encrypt / write flow
|
||||
_, err := transformer2.TransformToStorage(ctx, []byte(testText), dataCtx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.want), tt.metrics...); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
var flagOnce sync.Once // support running `go test -count X`
|
||||
|
||||
func TestEnvelopeLogging(t *testing.T) {
|
||||
|
@ -156,6 +156,17 @@ var (
|
||||
},
|
||||
[]string{"provider_name", "error"},
|
||||
)
|
||||
|
||||
DekSourceCacheSize = metrics.NewGaugeVec(
|
||||
&metrics.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "dek_source_cache_size",
|
||||
Help: "Number of records in data encryption key (DEK) source cache. On a restart, this value is an approximation of the number of decrypt RPC calls the server will make to the KMS plugin.",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"provider_name"},
|
||||
)
|
||||
)
|
||||
|
||||
var registerMetricsFunc sync.Once
|
||||
@ -197,6 +208,7 @@ func RegisterMetrics() {
|
||||
}
|
||||
legacyregistry.MustRegister(dekCacheFillPercent)
|
||||
legacyregistry.MustRegister(dekCacheInterArrivals)
|
||||
legacyregistry.MustRegister(DekSourceCacheSize)
|
||||
legacyregistry.MustRegister(KeyIDHashTotal)
|
||||
legacyregistry.MustRegister(KeyIDHashLastTimestampSeconds)
|
||||
legacyregistry.MustRegister(KeyIDHashStatusLastTimestampSeconds)
|
||||
@ -255,6 +267,10 @@ func RecordDekCacheFillPercent(percent float64) {
|
||||
dekCacheFillPercent.Set(percent)
|
||||
}
|
||||
|
||||
func RecordDekSourceCacheSize(providerName string, size int) {
|
||||
DekSourceCacheSize.WithLabelValues(providerName).Set(float64(size))
|
||||
}
|
||||
|
||||
// RecordKMSOperationLatency records the latency of KMS operation.
|
||||
func RecordKMSOperationLatency(providerName, methodName string, duration time.Duration, err error) {
|
||||
KMSOperationsLatencyMetric.WithLabelValues(providerName, methodName, getErrorCode(err)).Observe(duration.Seconds())
|
||||
|
Loading…
Reference in New Issue
Block a user