Merge pull request #121095 from aramase/aramase/f/kmsv2_tracing

[KMSv2] Add tracing
This commit is contained in:
Kubernetes Prow Robot 2023-10-25 21:29:01 +02:00 committed by GitHub
commit ae603d5260
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 385 additions and 6 deletions

View File

@ -28,6 +28,7 @@ import (
"unsafe" "unsafe"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/crypto/cryptobyte" "golang.org/x/crypto/cryptobyte"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
@ -39,6 +40,7 @@ import (
aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes" aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2" kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics" "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2" "k8s.io/klog/v2"
kmsservice "k8s.io/kms/pkg/service" kmsservice "k8s.io/kms/pkg/service"
"k8s.io/utils/clock" "k8s.io/utils/clock"
@ -133,11 +135,28 @@ func newEnvelopeTransformerWithClock(envelopeService kmsservice.Service, provide
// TransformFromStorage decrypts data encrypted by this transformer using envelope encryption. // TransformFromStorage decrypts data encrypted by this transformer using envelope encryption.
func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) { func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
ctx, span := tracing.Start(ctx, "TransformFromStorage with envelopeTransformer",
attribute.String("transformer.provider.name", t.providerName),
// The service.instance_id of the apiserver is already available in the trace
/*
{
"key": "service.instance.id",
"type": "string",
"value": "apiserver-zsteyir5lyrtdcmqqmd5kzze6m"
}
*/
)
defer span.End(500 * time.Millisecond)
span.AddEvent("About to decode encrypted object")
// Deserialize the EncryptedObject from the data. // Deserialize the EncryptedObject from the data.
encryptedObject, err := t.doDecode(data) encryptedObject, err := t.doDecode(data)
if err != nil { if err != nil {
span.AddEvent("Decoding encrypted object failed")
span.RecordError(err)
return nil, false, err return nil, false, err
} }
span.AddEvent("Decoded encrypted object")
useSeed := encryptedObject.EncryptedDEKSourceType == kmstypes.EncryptedDEKSourceType_HKDF_SHA256_XNONCE_AES_GCM_SEED useSeed := encryptedObject.EncryptedDEKSourceType == kmstypes.EncryptedDEKSourceType_HKDF_SHA256_XNONCE_AES_GCM_SEED
@ -158,6 +177,7 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
// fallback to the envelope service if we do not have the transformer locally // fallback to the envelope service if we do not have the transformer locally
if transformer == nil { if transformer == nil {
span.AddEvent("About to decrypt DEK using remote service")
value.RecordCacheMiss() value.RecordCacheMiss()
requestInfo := getRequestInfoFromContext(ctx) requestInfo := getRequestInfoFromContext(ctx)
@ -172,8 +192,11 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
Annotations: encryptedObject.Annotations, Annotations: encryptedObject.Annotations,
}) })
if err != nil { if err != nil {
span.AddEvent("DEK decryption failed")
span.RecordError(err)
return nil, false, fmt.Errorf("failed to decrypt DEK, error: %w", err) return nil, false, fmt.Errorf("failed to decrypt DEK, error: %w", err)
} }
span.AddEvent("DEK decryption succeeded")
transformer, err = t.addTransformerForDecryption(encryptedObjectCacheKey, key, useSeed) transformer, err = t.addTransformerForDecryption(encryptedObjectCacheKey, key, useSeed)
if err != nil { if err != nil {
@ -182,11 +205,15 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
} }
metrics.RecordKeyID(metrics.FromStorageLabel, t.providerName, encryptedObject.KeyID, t.apiServerID) metrics.RecordKeyID(metrics.FromStorageLabel, t.providerName, encryptedObject.KeyID, t.apiServerID)
span.AddEvent("About to decrypt data using DEK")
out, stale, err := transformer.TransformFromStorage(ctx, encryptedObject.EncryptedData, dataCtx) out, stale, err := transformer.TransformFromStorage(ctx, encryptedObject.EncryptedData, dataCtx)
if err != nil { if err != nil {
span.AddEvent("Data decryption failed")
span.RecordError(err)
return nil, false, err return nil, false, err
} }
span.AddEvent("Data decryption succeeded")
// data is considered stale if the key ID does not match our current write transformer // data is considered stale if the key ID does not match our current write transformer
return out, return out,
stale || stale ||
@ -197,6 +224,19 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
// TransformToStorage encrypts data to be written to disk using envelope encryption. // TransformToStorage encrypts data to be written to disk using envelope encryption.
func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) { func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
ctx, span := tracing.Start(ctx, "TransformToStorage with envelopeTransformer",
attribute.String("transformer.provider.name", t.providerName),
// The service.instance_id of the apiserver is already available in the trace
/*
{
"key": "service.instance.id",
"type": "string",
"value": "apiserver-zsteyir5lyrtdcmqqmd5kzze6m"
}
*/
)
defer span.End(500 * time.Millisecond)
state, err := t.stateFunc() state, err := t.stateFunc()
if err != nil { if err != nil {
return nil, err return nil, err
@ -215,18 +255,31 @@ func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byt
"group", requestInfo.APIGroup, "version", requestInfo.APIVersion, "resource", requestInfo.Resource, "subresource", requestInfo.Subresource, "group", requestInfo.APIGroup, "version", requestInfo.APIVersion, "resource", requestInfo.Resource, "subresource", requestInfo.Subresource,
"verb", requestInfo.Verb, "namespace", requestInfo.Namespace, "name", requestInfo.Name) "verb", requestInfo.Verb, "namespace", requestInfo.Namespace, "name", requestInfo.Name)
span.AddEvent("About to encrypt data using DEK")
result, err := state.Transformer.TransformToStorage(ctx, data, dataCtx) result, err := state.Transformer.TransformToStorage(ctx, data, dataCtx)
if err != nil { if err != nil {
span.AddEvent("Data encryption failed")
span.RecordError(err)
return nil, err return nil, err
} }
span.AddEvent("Data encryption succeeded")
metrics.RecordKeyID(metrics.ToStorageLabel, t.providerName, state.EncryptedObject.KeyID, t.apiServerID) metrics.RecordKeyID(metrics.ToStorageLabel, t.providerName, state.EncryptedObject.KeyID, t.apiServerID)
encObjectCopy := state.EncryptedObject encObjectCopy := state.EncryptedObject
encObjectCopy.EncryptedData = result encObjectCopy.EncryptedData = result
span.AddEvent("About to encode encrypted object")
// Serialize the EncryptedObject to a byte array. // Serialize the EncryptedObject to a byte array.
return t.doEncode(&encObjectCopy) out, err := t.doEncode(&encObjectCopy)
if err != nil {
span.AddEvent("Encoding encrypted object failed")
span.RecordError(err)
return nil, err
}
span.AddEvent("Encoded encrypted object")
return out, nil
} }
// addTransformerForDecryption inserts a new transformer to the Envelope cache of DEKs for future reads. // addTransformerForDecryption inserts a new transformer to the Envelope cache of DEKs for future reads.

View File

@ -33,6 +33,8 @@ import (
"time" "time"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
utilrand "k8s.io/apimachinery/pkg/util/rand" utilrand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
@ -1262,6 +1264,165 @@ func TestGenerateTransformer(t *testing.T) {
} }
} }
func TestEnvelopeTracing_TransformToStorage(t *testing.T) {
testCases := []struct {
desc string
expected []string
}{
{
desc: "encrypt",
expected: []string{
"About to encrypt data using DEK",
"Data encryption succeeded",
"About to encode encrypted object",
"Encoded encrypted object",
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
fakeRecorder := tracetest.NewSpanRecorder()
otelTracer := trace.NewTracerProvider(trace.WithSpanProcessor(fakeRecorder)).Tracer("test")
ctx := testContext(t)
ctx, span := otelTracer.Start(ctx, "parent")
defer span.End()
envelopeService := newTestEnvelopeService()
fakeClock := testingclock.NewFakeClock(time.Now())
state, err := testStateFunc(ctx, envelopeService, clock.RealClock{}, randomBool())()
if err != nil {
t.Fatal(err)
}
transformer := newEnvelopeTransformerWithClock(envelopeService, testProviderName,
func() (State, error) { return state, nil }, testAPIServerID, 1*time.Second, fakeClock)
dataCtx := value.DefaultContext([]byte(testContextText))
originalText := []byte(testText)
if _, err := transformer.TransformToStorage(ctx, originalText, dataCtx); err != nil {
t.Fatalf("envelopeTransformer: error while transforming data to storage: %v", err)
}
output := fakeRecorder.Ended()
if len(output) != 1 {
t.Fatalf("expected 1 span, got %d", len(output))
}
out := output[0]
validateTraceSpan(t, out, "TransformToStorage with envelopeTransformer", testProviderName, testAPIServerID, tc.expected)
})
}
}
func TestEnvelopeTracing_TransformFromStorage(t *testing.T) {
testCases := []struct {
desc string
cacheTTL time.Duration
simulateKMSPluginFailure bool
expected []string
}{
{
desc: "decrypt",
cacheTTL: 5 * time.Second,
expected: []string{
"About to decode encrypted object",
"Decoded encrypted object",
"About to decrypt data using DEK",
"Data decryption succeeded",
},
},
{
desc: "decrypt with cache miss",
cacheTTL: 1 * time.Second,
expected: []string{
"About to decode encrypted object",
"Decoded encrypted object",
"About to decrypt DEK using remote service",
"DEK decryption succeeded",
"About to decrypt data using DEK",
"Data decryption succeeded",
},
},
{
desc: "decrypt with cache miss, simulate KMS plugin failure",
cacheTTL: 1 * time.Second,
simulateKMSPluginFailure: true,
expected: []string{
"About to decode encrypted object",
"Decoded encrypted object",
"About to decrypt DEK using remote service",
"DEK decryption failed",
"exception",
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
fakeRecorder := tracetest.NewSpanRecorder()
otelTracer := trace.NewTracerProvider(trace.WithSpanProcessor(fakeRecorder)).Tracer("test")
ctx := testContext(t)
envelopeService := newTestEnvelopeService()
fakeClock := testingclock.NewFakeClock(time.Now())
state, err := testStateFunc(ctx, envelopeService, clock.RealClock{}, randomBool())()
if err != nil {
t.Fatal(err)
}
transformer := newEnvelopeTransformerWithClock(envelopeService, testProviderName,
func() (State, error) { return state, nil }, testAPIServerID, tc.cacheTTL, fakeClock)
dataCtx := value.DefaultContext([]byte(testContextText))
originalText := []byte(testText)
transformedData, _ := transformer.TransformToStorage(ctx, originalText, dataCtx)
// advance the clock to allow cache entries to expire depending on TTL
fakeClock.Step(2 * time.Second)
// force GC to run by performing a write
transformer.(*envelopeTransformer).cache.set([]byte("some-other-unrelated-key"), &envelopeTransformer{})
envelopeService.SetDisabledStatus(tc.simulateKMSPluginFailure)
// start recording only for the decrypt call
ctx, span := otelTracer.Start(ctx, "parent")
defer span.End()
_, _, _ = transformer.TransformFromStorage(ctx, transformedData, dataCtx)
output := fakeRecorder.Ended()
validateTraceSpan(t, output[0], "TransformFromStorage with envelopeTransformer", testProviderName, testAPIServerID, tc.expected)
})
}
}
func validateTraceSpan(t *testing.T, span trace.ReadOnlySpan, spanName, providerName, apiserverID string, expected []string) {
t.Helper()
if span.Name() != spanName {
t.Fatalf("expected span name %q, got %q", spanName, span.Name())
}
attrs := span.Attributes()
if len(attrs) != 1 {
t.Fatalf("expected 1 attributes, got %d", len(attrs))
}
if attrs[0].Key != "transformer.provider.name" && attrs[0].Value.AsString() != providerName {
t.Errorf("expected providerName %q, got %q", providerName, attrs[0].Value.AsString())
}
if len(span.Events()) != len(expected) {
t.Fatalf("expected %d events, got %d", len(expected), len(span.Events()))
}
for i, event := range span.Events() {
if event.Name != expected[i] {
t.Errorf("expected event %q, got %q", expected[i], event.Name)
}
}
}
func errString(err error) string { func errString(err error) string {
if err == nil { if err == nil {
return "" return ""

View File

@ -68,6 +68,12 @@ func (s *Span) End(logThreshold time.Duration) {
} }
} }
// RecordError will record err as an exception span event for this span.
// If this span is not being recorded or err is nil then this method does nothing.
func (s *Span) RecordError(err error, attributes ...attribute.KeyValue) {
s.otelSpan.RecordError(err, trace.WithAttributes(attributes...))
}
func attributesToFields(attributes []attribute.KeyValue) []utiltrace.Field { func attributesToFields(attributes []attribute.KeyValue) []utiltrace.Field {
fields := make([]utiltrace.Field, len(attributes)) fields := make([]utiltrace.Field, len(attributes))
for i := range attributes { for i := range attributes {

View File

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"context" "context"
"flag" "flag"
"fmt"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -56,6 +57,9 @@ func TestOpenTelemetryTracing(t *testing.T) {
tr.AddEvent("reticulated splines", attribute.Bool("should I do it?", false)) // took 5ms tr.AddEvent("reticulated splines", attribute.Bool("should I do it?", false)) // took 5ms
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
// Add error event to the frobber span
tr.RecordError(fmt.Errorf("something went wrong"))
// Ensure setting context with span makes the next span a child // Ensure setting context with span makes the next span a child
ctx = ContextWithSpan(context.Background(), tr) ctx = ContextWithSpan(context.Background(), tr)
@ -87,7 +91,7 @@ func TestOpenTelemetryTracing(t *testing.T) {
if len(child.Attributes()) != 1 { if len(child.Attributes()) != 1 {
t.Errorf("got attributes %v; expected one attribute in child.Attributes()", child.Attributes()) t.Errorf("got attributes %v; expected one attribute in child.Attributes()", child.Attributes())
} }
if len(child.Events()) != 2 { if len(child.Events()) != 3 {
t.Errorf("got events %v; expected 2 events in child.Events()", child.Events()) t.Errorf("got events %v; expected 2 events in child.Events()", child.Events())
} }
if child.Events()[0].Name != "reticulated splines" { if child.Events()[0].Name != "reticulated splines" {
@ -96,11 +100,17 @@ func TestOpenTelemetryTracing(t *testing.T) {
if len(child.Events()[0].Attributes) != 1 { if len(child.Events()[0].Attributes) != 1 {
t.Errorf("got event %v; expected 1 attribute in child.Events()[0].Attributes", child.Events()[0]) t.Errorf("got event %v; expected 1 attribute in child.Events()[0].Attributes", child.Events()[0])
} }
if child.Events()[1].Name != "sequenced particles" { if child.Events()[1].Name != "exception" {
t.Errorf("got event %v; expected child.Events()[1].Name == sequenced particles", child.Events()[1]) t.Errorf("got event %v; expected child.Events()[1].Name == something went wrong", child.Events()[1])
} }
if len(child.Events()[1].Attributes) != 1 { if len(child.Events()[1].Attributes) != 2 {
t.Errorf("got event %v; expected 1 attribute in child.Events()[1].Attributes", child.Events()[1]) t.Errorf("got event %#v; expected 2 attribute in child.Events()[1].Attributes", child.Events()[1])
}
if child.Events()[2].Name != "sequenced particles" {
t.Errorf("got event %v; expected child.Events()[2].Name == sequenced particles", child.Events()[2])
}
if len(child.Events()[2].Attributes) != 1 {
t.Errorf("got event %v; expected 1 attribute in child.Events()[2].Attributes", child.Events()[2])
} }
// Parent span is ended last // Parent span is ended last
parent := output[2] parent := output[2]

View File

@ -1,3 +1,6 @@
//go:build !windows
// +build !windows
/* /*
Copyright 2021 The Kubernetes Authors. Copyright 2021 The Kubernetes Authors.
@ -37,12 +40,158 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/strategicpatch"
kmsv2mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v2"
client "k8s.io/client-go/kubernetes" client "k8s.io/client-go/kubernetes"
utiltesting "k8s.io/client-go/util/testing" utiltesting "k8s.io/client-go/util/testing"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
) )
func TestAPIServerTracingWithKMSv2(t *testing.T) {
// Listen for traces from the API Server before starting it, so the
// API Server will successfully connect right away during the test.
listener, err := net.Listen("tcp", "localhost:")
if err != nil {
t.Fatal(err)
}
encryptionConfigFile, err := os.CreateTemp("", "encryption-config.yaml")
if err != nil {
t.Fatal(err)
}
defer os.Remove(encryptionConfigFile.Name())
if err := os.WriteFile(encryptionConfigFile.Name(), []byte(`
apiVersion: apiserver.config.k8s.io/v1
kind: EncryptionConfiguration
resources:
- resources:
- secrets
providers:
- kms:
apiVersion: v2
name: kms-provider
endpoint: unix:///@kms-provider.sock`), os.FileMode(0755)); err != nil {
t.Fatal(err)
}
// Write the configuration for tracing to a file
tracingConfigFile, err := os.CreateTemp("", "tracing-config.yaml")
if err != nil {
t.Fatal(err)
}
defer os.Remove(tracingConfigFile.Name())
if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(`
apiVersion: apiserver.config.k8s.io/v1beta1
kind: TracingConfiguration
samplingRatePerMillion: 1000000
endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
t.Fatal(err)
}
srv := grpc.NewServer()
fakeServer := &traceServer{t: t}
fakeServer.resetExpectations([]*spanExpectation{})
traceservice.RegisterTraceServiceServer(srv, fakeServer)
go func() {
if err := srv.Serve(listener); err != nil {
t.Error(err)
return
}
}()
defer srv.Stop()
_ = kmsv2mock.NewBase64Plugin(t, "@kms-provider.sock")
// Start the API Server with our tracing configuration
testServer := kubeapiservertesting.StartTestServerOrDie(t,
kubeapiservertesting.NewDefaultTestServerOptions(),
[]string{
"--tracing-config-file=" + tracingConfigFile.Name(),
"--encryption-provider-config=" + encryptionConfigFile.Name(),
},
framework.SharedEtcd(),
)
defer testServer.TearDownFn()
clientSet, err := client.NewForConfig(testServer.ClientConfig)
if err != nil {
t.Fatal(err)
}
for _, tc := range []struct {
desc string
apiCall func(client.Interface) error
expectedTrace []*spanExpectation
}{
{
desc: "create secret",
apiCall: func(c client.Interface) error {
_, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Create(context.Background(),
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "fake"}, Data: map[string][]byte{"foo": []byte("bar")}}, metav1.CreateOptions{})
return err
},
expectedTrace: []*spanExpectation{
{
name: "TransformToStorage with envelopeTransformer",
attributes: map[string]func(*commonv1.AnyValue) bool{
"transformer.provider.name": func(v *commonv1.AnyValue) bool {
return v.GetStringValue() == "kms-provider"
},
},
events: []string{
"About to encrypt data using DEK",
"Data encryption succeeded",
"About to encode encrypted object",
"Encoded encrypted object",
},
},
},
},
{
desc: "get secret",
apiCall: func(c client.Interface) error {
// This depends on the "create secret" step having completed successfully
_, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Get(context.Background(), "fake", metav1.GetOptions{})
return err
},
expectedTrace: []*spanExpectation{
{
name: "TransformFromStorage with envelopeTransformer",
attributes: map[string]func(*commonv1.AnyValue) bool{
"transformer.provider.name": func(v *commonv1.AnyValue) bool {
return v.GetStringValue() == "kms-provider"
},
},
events: []string{
"About to decode encrypted object",
"Decoded encrypted object",
"About to decrypt data using DEK",
"Data decryption succeeded",
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
fakeServer.resetExpectations(tc.expectedTrace)
// Make our call to the API server
if err := tc.apiCall(clientSet); err != nil {
t.Fatal(err)
}
// Wait for a span to be recorded from our request
select {
case <-fakeServer.traceFound:
case <-time.After(30 * time.Second):
t.Fatal("Timed out waiting for trace")
}
})
}
}
func TestAPIServerTracingWithEgressSelector(t *testing.T) { func TestAPIServerTracingWithEgressSelector(t *testing.T) {
// Listen for traces from the API Server before starting it, so the // Listen for traces from the API Server before starting it, so the
// API Server will successfully connect right away during the test. // API Server will successfully connect right away during the test.