diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope.go b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope.go index 23de3717a18..3821ad403a9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope.go @@ -28,6 +28,7 @@ import ( "unsafe" "github.com/gogo/protobuf/proto" + "go.opentelemetry.io/otel/attribute" "golang.org/x/crypto/cryptobyte" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -39,6 +40,7 @@ import ( aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes" kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2" "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics" + "k8s.io/component-base/tracing" "k8s.io/klog/v2" kmsservice "k8s.io/kms/pkg/service" "k8s.io/utils/clock" @@ -133,11 +135,28 @@ func newEnvelopeTransformerWithClock(envelopeService kmsservice.Service, provide // TransformFromStorage decrypts data encrypted by this transformer using envelope encryption. func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) { + ctx, span := tracing.Start(ctx, "TransformFromStorage with envelopeTransformer", + attribute.String("transformer.provider.name", t.providerName), + // The service.instance_id of the apiserver is already available in the trace + /* + { + "key": "service.instance.id", + "type": "string", + "value": "apiserver-zsteyir5lyrtdcmqqmd5kzze6m" + } + */ + ) + defer span.End(500 * time.Millisecond) + + span.AddEvent("About to decode encrypted object") // Deserialize the EncryptedObject from the data. encryptedObject, err := t.doDecode(data) if err != nil { + span.AddEvent("Decoding encrypted object failed") + span.RecordError(err) return nil, false, err } + span.AddEvent("Decoded encrypted object") useSeed := encryptedObject.EncryptedDEKSourceType == kmstypes.EncryptedDEKSourceType_HKDF_SHA256_XNONCE_AES_GCM_SEED @@ -158,6 +177,7 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b // fallback to the envelope service if we do not have the transformer locally if transformer == nil { + span.AddEvent("About to decrypt DEK using remote service") value.RecordCacheMiss() requestInfo := getRequestInfoFromContext(ctx) @@ -172,8 +192,11 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b Annotations: encryptedObject.Annotations, }) if err != nil { + span.AddEvent("DEK decryption failed") + span.RecordError(err) return nil, false, fmt.Errorf("failed to decrypt DEK, error: %w", err) } + span.AddEvent("DEK decryption succeeded") transformer, err = t.addTransformerForDecryption(encryptedObjectCacheKey, key, useSeed) if err != nil { @@ -182,11 +205,15 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b } metrics.RecordKeyID(metrics.FromStorageLabel, t.providerName, encryptedObject.KeyID, t.apiServerID) + span.AddEvent("About to decrypt data using DEK") out, stale, err := transformer.TransformFromStorage(ctx, encryptedObject.EncryptedData, dataCtx) if err != nil { + span.AddEvent("Data decryption failed") + span.RecordError(err) return nil, false, err } + span.AddEvent("Data decryption succeeded") // data is considered stale if the key ID does not match our current write transformer return out, stale || @@ -197,6 +224,19 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b // TransformToStorage encrypts data to be written to disk using envelope encryption. func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) { + ctx, span := tracing.Start(ctx, "TransformToStorage with envelopeTransformer", + attribute.String("transformer.provider.name", t.providerName), + // The service.instance_id of the apiserver is already available in the trace + /* + { + "key": "service.instance.id", + "type": "string", + "value": "apiserver-zsteyir5lyrtdcmqqmd5kzze6m" + } + */ + ) + defer span.End(500 * time.Millisecond) + state, err := t.stateFunc() if err != nil { return nil, err @@ -215,18 +255,31 @@ func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byt "group", requestInfo.APIGroup, "version", requestInfo.APIVersion, "resource", requestInfo.Resource, "subresource", requestInfo.Subresource, "verb", requestInfo.Verb, "namespace", requestInfo.Namespace, "name", requestInfo.Name) + span.AddEvent("About to encrypt data using DEK") result, err := state.Transformer.TransformToStorage(ctx, data, dataCtx) if err != nil { + span.AddEvent("Data encryption failed") + span.RecordError(err) return nil, err } + span.AddEvent("Data encryption succeeded") metrics.RecordKeyID(metrics.ToStorageLabel, t.providerName, state.EncryptedObject.KeyID, t.apiServerID) encObjectCopy := state.EncryptedObject encObjectCopy.EncryptedData = result + span.AddEvent("About to encode encrypted object") // Serialize the EncryptedObject to a byte array. - return t.doEncode(&encObjectCopy) + out, err := t.doEncode(&encObjectCopy) + if err != nil { + span.AddEvent("Encoding encrypted object failed") + span.RecordError(err) + return nil, err + } + span.AddEvent("Encoded encrypted object") + + return out, nil } // addTransformerForDecryption inserts a new transformer to the Envelope cache of DEKs for future reads. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope_test.go b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope_test.go index f52d4cd9d46..57f957a1bb0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/envelope_test.go @@ -33,6 +33,8 @@ import ( "time" "github.com/gogo/protobuf/proto" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" @@ -1262,6 +1264,165 @@ func TestGenerateTransformer(t *testing.T) { } } +func TestEnvelopeTracing_TransformToStorage(t *testing.T) { + testCases := []struct { + desc string + expected []string + }{ + { + desc: "encrypt", + expected: []string{ + "About to encrypt data using DEK", + "Data encryption succeeded", + "About to encode encrypted object", + "Encoded encrypted object", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + fakeRecorder := tracetest.NewSpanRecorder() + otelTracer := trace.NewTracerProvider(trace.WithSpanProcessor(fakeRecorder)).Tracer("test") + + ctx := testContext(t) + ctx, span := otelTracer.Start(ctx, "parent") + defer span.End() + + envelopeService := newTestEnvelopeService() + fakeClock := testingclock.NewFakeClock(time.Now()) + state, err := testStateFunc(ctx, envelopeService, clock.RealClock{}, randomBool())() + if err != nil { + t.Fatal(err) + } + + transformer := newEnvelopeTransformerWithClock(envelopeService, testProviderName, + func() (State, error) { return state, nil }, testAPIServerID, 1*time.Second, fakeClock) + + dataCtx := value.DefaultContext([]byte(testContextText)) + originalText := []byte(testText) + + if _, err := transformer.TransformToStorage(ctx, originalText, dataCtx); err != nil { + t.Fatalf("envelopeTransformer: error while transforming data to storage: %v", err) + } + + output := fakeRecorder.Ended() + if len(output) != 1 { + t.Fatalf("expected 1 span, got %d", len(output)) + } + out := output[0] + validateTraceSpan(t, out, "TransformToStorage with envelopeTransformer", testProviderName, testAPIServerID, tc.expected) + }) + } +} + +func TestEnvelopeTracing_TransformFromStorage(t *testing.T) { + testCases := []struct { + desc string + cacheTTL time.Duration + simulateKMSPluginFailure bool + expected []string + }{ + { + desc: "decrypt", + cacheTTL: 5 * time.Second, + expected: []string{ + "About to decode encrypted object", + "Decoded encrypted object", + "About to decrypt data using DEK", + "Data decryption succeeded", + }, + }, + { + desc: "decrypt with cache miss", + cacheTTL: 1 * time.Second, + expected: []string{ + "About to decode encrypted object", + "Decoded encrypted object", + "About to decrypt DEK using remote service", + "DEK decryption succeeded", + "About to decrypt data using DEK", + "Data decryption succeeded", + }, + }, + { + desc: "decrypt with cache miss, simulate KMS plugin failure", + cacheTTL: 1 * time.Second, + simulateKMSPluginFailure: true, + expected: []string{ + "About to decode encrypted object", + "Decoded encrypted object", + "About to decrypt DEK using remote service", + "DEK decryption failed", + "exception", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + fakeRecorder := tracetest.NewSpanRecorder() + otelTracer := trace.NewTracerProvider(trace.WithSpanProcessor(fakeRecorder)).Tracer("test") + + ctx := testContext(t) + + envelopeService := newTestEnvelopeService() + fakeClock := testingclock.NewFakeClock(time.Now()) + state, err := testStateFunc(ctx, envelopeService, clock.RealClock{}, randomBool())() + if err != nil { + t.Fatal(err) + } + + transformer := newEnvelopeTransformerWithClock(envelopeService, testProviderName, + func() (State, error) { return state, nil }, testAPIServerID, tc.cacheTTL, fakeClock) + + dataCtx := value.DefaultContext([]byte(testContextText)) + originalText := []byte(testText) + + transformedData, _ := transformer.TransformToStorage(ctx, originalText, dataCtx) + + // advance the clock to allow cache entries to expire depending on TTL + fakeClock.Step(2 * time.Second) + // force GC to run by performing a write + transformer.(*envelopeTransformer).cache.set([]byte("some-other-unrelated-key"), &envelopeTransformer{}) + + envelopeService.SetDisabledStatus(tc.simulateKMSPluginFailure) + + // start recording only for the decrypt call + ctx, span := otelTracer.Start(ctx, "parent") + defer span.End() + + _, _, _ = transformer.TransformFromStorage(ctx, transformedData, dataCtx) + + output := fakeRecorder.Ended() + validateTraceSpan(t, output[0], "TransformFromStorage with envelopeTransformer", testProviderName, testAPIServerID, tc.expected) + }) + } +} + +func validateTraceSpan(t *testing.T, span trace.ReadOnlySpan, spanName, providerName, apiserverID string, expected []string) { + t.Helper() + + if span.Name() != spanName { + t.Fatalf("expected span name %q, got %q", spanName, span.Name()) + } + attrs := span.Attributes() + if len(attrs) != 1 { + t.Fatalf("expected 1 attributes, got %d", len(attrs)) + } + if attrs[0].Key != "transformer.provider.name" && attrs[0].Value.AsString() != providerName { + t.Errorf("expected providerName %q, got %q", providerName, attrs[0].Value.AsString()) + } + if len(span.Events()) != len(expected) { + t.Fatalf("expected %d events, got %d", len(expected), len(span.Events())) + } + for i, event := range span.Events() { + if event.Name != expected[i] { + t.Errorf("expected event %q, got %q", expected[i], event.Name) + } + } +} + func errString(err error) string { if err == nil { return "" diff --git a/staging/src/k8s.io/component-base/tracing/tracing.go b/staging/src/k8s.io/component-base/tracing/tracing.go index 50894eb3b9b..bdf6f377dde 100644 --- a/staging/src/k8s.io/component-base/tracing/tracing.go +++ b/staging/src/k8s.io/component-base/tracing/tracing.go @@ -68,6 +68,12 @@ func (s *Span) End(logThreshold time.Duration) { } } +// RecordError will record err as an exception span event for this span. +// If this span is not being recorded or err is nil then this method does nothing. +func (s *Span) RecordError(err error, attributes ...attribute.KeyValue) { + s.otelSpan.RecordError(err, trace.WithAttributes(attributes...)) +} + func attributesToFields(attributes []attribute.KeyValue) []utiltrace.Field { fields := make([]utiltrace.Field, len(attributes)) for i := range attributes { diff --git a/staging/src/k8s.io/component-base/tracing/tracing_test.go b/staging/src/k8s.io/component-base/tracing/tracing_test.go index 0e3e12c6071..145eb57b095 100644 --- a/staging/src/k8s.io/component-base/tracing/tracing_test.go +++ b/staging/src/k8s.io/component-base/tracing/tracing_test.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "flag" + "fmt" "strings" "testing" "time" @@ -56,6 +57,9 @@ func TestOpenTelemetryTracing(t *testing.T) { tr.AddEvent("reticulated splines", attribute.Bool("should I do it?", false)) // took 5ms time.Sleep(10 * time.Millisecond) + // Add error event to the frobber span + tr.RecordError(fmt.Errorf("something went wrong")) + // Ensure setting context with span makes the next span a child ctx = ContextWithSpan(context.Background(), tr) @@ -87,7 +91,7 @@ func TestOpenTelemetryTracing(t *testing.T) { if len(child.Attributes()) != 1 { t.Errorf("got attributes %v; expected one attribute in child.Attributes()", child.Attributes()) } - if len(child.Events()) != 2 { + if len(child.Events()) != 3 { t.Errorf("got events %v; expected 2 events in child.Events()", child.Events()) } if child.Events()[0].Name != "reticulated splines" { @@ -96,11 +100,17 @@ func TestOpenTelemetryTracing(t *testing.T) { if len(child.Events()[0].Attributes) != 1 { t.Errorf("got event %v; expected 1 attribute in child.Events()[0].Attributes", child.Events()[0]) } - if child.Events()[1].Name != "sequenced particles" { - t.Errorf("got event %v; expected child.Events()[1].Name == sequenced particles", child.Events()[1]) + if child.Events()[1].Name != "exception" { + t.Errorf("got event %v; expected child.Events()[1].Name == something went wrong", child.Events()[1]) } - if len(child.Events()[1].Attributes) != 1 { - t.Errorf("got event %v; expected 1 attribute in child.Events()[1].Attributes", child.Events()[1]) + if len(child.Events()[1].Attributes) != 2 { + t.Errorf("got event %#v; expected 2 attribute in child.Events()[1].Attributes", child.Events()[1]) + } + if child.Events()[2].Name != "sequenced particles" { + t.Errorf("got event %v; expected child.Events()[2].Name == sequenced particles", child.Events()[2]) + } + if len(child.Events()[2].Attributes) != 1 { + t.Errorf("got event %v; expected 1 attribute in child.Events()[2].Attributes", child.Events()[2]) } // Parent span is ended last parent := output[2] diff --git a/test/integration/apiserver/tracing/tracing_test.go b/test/integration/apiserver/tracing/tracing_test.go index 23594843a8f..c3cdc15bb5e 100644 --- a/test/integration/apiserver/tracing/tracing_test.go +++ b/test/integration/apiserver/tracing/tracing_test.go @@ -1,3 +1,6 @@ +//go:build !windows +// +build !windows + /* Copyright 2021 The Kubernetes Authors. @@ -37,12 +40,158 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" + kmsv2mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v2" client "k8s.io/client-go/kubernetes" utiltesting "k8s.io/client-go/util/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" ) +func TestAPIServerTracingWithKMSv2(t *testing.T) { + // Listen for traces from the API Server before starting it, so the + // API Server will successfully connect right away during the test. + listener, err := net.Listen("tcp", "localhost:") + if err != nil { + t.Fatal(err) + } + + encryptionConfigFile, err := os.CreateTemp("", "encryption-config.yaml") + if err != nil { + t.Fatal(err) + } + defer os.Remove(encryptionConfigFile.Name()) + + if err := os.WriteFile(encryptionConfigFile.Name(), []byte(` +apiVersion: apiserver.config.k8s.io/v1 +kind: EncryptionConfiguration +resources: + - resources: + - secrets + providers: + - kms: + apiVersion: v2 + name: kms-provider + endpoint: unix:///@kms-provider.sock`), os.FileMode(0755)); err != nil { + t.Fatal(err) + } + + // Write the configuration for tracing to a file + tracingConfigFile, err := os.CreateTemp("", "tracing-config.yaml") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tracingConfigFile.Name()) + + if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(` +apiVersion: apiserver.config.k8s.io/v1beta1 +kind: TracingConfiguration +samplingRatePerMillion: 1000000 +endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { + t.Fatal(err) + } + + srv := grpc.NewServer() + fakeServer := &traceServer{t: t} + fakeServer.resetExpectations([]*spanExpectation{}) + traceservice.RegisterTraceServiceServer(srv, fakeServer) + + go func() { + if err := srv.Serve(listener); err != nil { + t.Error(err) + return + } + }() + defer srv.Stop() + + _ = kmsv2mock.NewBase64Plugin(t, "@kms-provider.sock") + + // Start the API Server with our tracing configuration + testServer := kubeapiservertesting.StartTestServerOrDie(t, + kubeapiservertesting.NewDefaultTestServerOptions(), + []string{ + "--tracing-config-file=" + tracingConfigFile.Name(), + "--encryption-provider-config=" + encryptionConfigFile.Name(), + }, + framework.SharedEtcd(), + ) + defer testServer.TearDownFn() + clientSet, err := client.NewForConfig(testServer.ClientConfig) + if err != nil { + t.Fatal(err) + } + + for _, tc := range []struct { + desc string + apiCall func(client.Interface) error + expectedTrace []*spanExpectation + }{ + { + desc: "create secret", + apiCall: func(c client.Interface) error { + _, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Create(context.Background(), + &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "fake"}, Data: map[string][]byte{"foo": []byte("bar")}}, metav1.CreateOptions{}) + return err + }, + expectedTrace: []*spanExpectation{ + { + name: "TransformToStorage with envelopeTransformer", + attributes: map[string]func(*commonv1.AnyValue) bool{ + "transformer.provider.name": func(v *commonv1.AnyValue) bool { + return v.GetStringValue() == "kms-provider" + }, + }, + events: []string{ + "About to encrypt data using DEK", + "Data encryption succeeded", + "About to encode encrypted object", + "Encoded encrypted object", + }, + }, + }, + }, + { + desc: "get secret", + apiCall: func(c client.Interface) error { + // This depends on the "create secret" step having completed successfully + _, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Get(context.Background(), "fake", metav1.GetOptions{}) + return err + }, + expectedTrace: []*spanExpectation{ + { + name: "TransformFromStorage with envelopeTransformer", + attributes: map[string]func(*commonv1.AnyValue) bool{ + "transformer.provider.name": func(v *commonv1.AnyValue) bool { + return v.GetStringValue() == "kms-provider" + }, + }, + events: []string{ + "About to decode encrypted object", + "Decoded encrypted object", + "About to decrypt data using DEK", + "Data decryption succeeded", + }, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + fakeServer.resetExpectations(tc.expectedTrace) + + // Make our call to the API server + if err := tc.apiCall(clientSet); err != nil { + t.Fatal(err) + } + + // Wait for a span to be recorded from our request + select { + case <-fakeServer.traceFound: + case <-time.After(30 * time.Second): + t.Fatal("Timed out waiting for trace") + } + }) + } +} + func TestAPIServerTracingWithEgressSelector(t *testing.T) { // Listen for traces from the API Server before starting it, so the // API Server will successfully connect right away during the test.