From 92da422427f523c11b21a85b77ee98379136ef65 Mon Sep 17 00:00:00 2001 From: Todd Treece Date: Thu, 4 Apr 2024 12:25:40 -0400 Subject: [PATCH] Add tracing to aggregator proxyHandler --- .../pkg/apiserver/apiserver.go | 6 + .../pkg/apiserver/handler_proxy.go | 11 ++ .../pkg/apiserver/handler_proxy_test.go | 116 ++++++++++++++++++ 3 files changed, 133 insertions(+) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 16ab2bdfa6a..bfd1135e787 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -37,6 +37,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" "k8s.io/client-go/transport" + "k8s.io/component-base/tracing" "k8s.io/component-base/version" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" @@ -169,6 +170,9 @@ type APIAggregator struct { // rejectForwardingRedirects is whether to allow to forward redirect response rejectForwardingRedirects bool + + // tracerProvider is used to wrap the proxy transport and handler with tracing + tracerProvider tracing.TracerProvider } // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. @@ -239,6 +243,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg openAPIV3Config: c.GenericConfig.OpenAPIV3Config, proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil }, rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects, + tracerProvider: c.GenericConfig.TracerProvider, } // used later to filter the served resource by those that have expired. @@ -518,6 +523,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { proxyTransportDial: s.proxyTransportDial, serviceResolver: s.serviceResolver, rejectForwardingRedirects: s.rejectForwardingRedirects, + tracerProvider: s.tracerProvider, } proxyHandler.updateAPIService(apiService) if s.openAPIAggregationController != nil { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 8a647a6451c..a59974f3005 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -27,10 +27,13 @@ import ( "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + genericfeatures "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy" "k8s.io/apiserver/pkg/util/x509metrics" "k8s.io/client-go/transport" + "k8s.io/component-base/tracing" "k8s.io/klog/v2" apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" @@ -59,6 +62,9 @@ type proxyHandler struct { // reject to forward redirect response rejectForwardingRedirects bool + + // tracerProvider is used to wrap the proxy transport and handler with tracing + tracerProvider tracing.TracerProvider } type proxyHandlingInfo struct { @@ -155,6 +161,11 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper) + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) && !upgrade { + tracingWrapper := tracing.WrapperFor(r.tracerProvider) + proxyRoundTripper = tracingWrapper(proxyRoundTripper) + } + // If we are upgrading, then the upgrade path tries to use this request with the TLS config we provide, but it does // NOT use the proxyRoundTripper. It's a direct dial that bypasses the proxyRoundTripper. This means that we have to // attach the "correct" user headers to the request ahead of time. diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index 9372a3e0c92..ab2ccea11f2 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -40,12 +40,17 @@ import ( "golang.org/x/net/websocket" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/filters" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server/egressselector" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" @@ -54,6 +59,7 @@ import ( "k8s.io/component-base/metrics/legacyregistry" apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" ) type targetHTTPHandler struct { @@ -774,6 +780,116 @@ func TestGetContextForNewRequest(t *testing.T) { } +func TestTracerProvider(t *testing.T) { + fakeRecorder := tracetest.NewSpanRecorder() + otelTracer := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(fakeRecorder)) + target := &targetHTTPHandler{} + user := &user.DefaultInfo{ + Name: "username", + Groups: []string{"one", "two"}, + } + path := "/request/path" + apiService := &apiregistration.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"}, + Spec: apiregistration.APIServiceSpec{ + Service: &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: ptr.To(int32(443))}, + Group: "foo", + Version: "v1", + CABundle: testCACrt, + }, + Status: apiregistration.APIServiceStatus{ + Conditions: []apiregistration.APIServiceCondition{ + {Type: apiregistration.Available, Status: apiregistration.ConditionTrue}, + }, + }, + } + targetServer := httptest.NewUnstartedServer(target) + serviceCert := svcCrt + if cert, err := tls.X509KeyPair(serviceCert, svcKey); err != nil { + t.Fatalf("TestTracerProvider: failed to parse key pair: %v", err) + } else { + targetServer.TLS = &tls.Config{Certificates: []tls.Certificate{cert}} + } + targetServer.StartTLS() + defer targetServer.Close() + + serviceResolver := &mockedRouter{destinationHost: targetServer.Listener.Addr().String()} + handler := &proxyHandler{ + localDelegate: http.NewServeMux(), + serviceResolver: serviceResolver, + proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() }, + tracerProvider: otelTracer, + } + + server := httptest.NewServer(contextHandler(filters.WithTracing(handler, otelTracer), user)) + defer server.Close() + + handler.updateAPIService(apiService) + curr := handler.handlingInfo.Load().(proxyHandlingInfo) + handler.handlingInfo.Store(curr) + var propagator propagation.TraceContext + req, err := http.NewRequest(http.MethodGet, server.URL+path, nil) + if err != nil { + t.Errorf("expected new request: %v", err) + return + } + + t.Logf("Sending request: %v", req) + _, err = http.DefaultClient.Do(req) + if err != nil { + t.Errorf("http request failed: %v", err) + return + } + + t.Log("Ensure the target server received the traceparent header") + id, ok := target.headers["Traceparent"] + if !ok { + t.Error("expected traceparent header") + return + } + + t.Log("Get the span context from the traceparent header") + h := http.Header{ + "Traceparent": id, + } + ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(h)) + span := trace.SpanFromContext(ctx) + + t.Log("Ensure that the span context is valid and remote") + if !span.SpanContext().IsValid() { + t.Error("expected valid span context") + return + } + + if !span.SpanContext().IsRemote() { + t.Error("expected remote span context") + return + } + + t.Log("Ensure that the span ID and trace ID match the expected values") + expectedSpanCtx := fakeRecorder.Ended()[0].SpanContext() + if expectedSpanCtx.TraceID() != span.SpanContext().TraceID() { + t.Errorf("expected trace id to match. expected: %v, but got %v", expectedSpanCtx.TraceID(), span.SpanContext().TraceID()) + return + } + + if expectedSpanCtx.SpanID() != span.SpanContext().SpanID() { + t.Errorf("expected span id to match. expected: %v, but got: %v", expectedSpanCtx.SpanID(), span.SpanContext().SpanID()) + return + } + + t.Log("Ensure that the expected spans were recorded when sending a request through the proxy") + expectedSpanNames := []string{"HTTP GET", "GET"} + spanNames := []string{} + for _, span := range fakeRecorder.Ended() { + spanNames = append(spanNames, span.Name()) + } + if e, a := expectedSpanNames, spanNames; !reflect.DeepEqual(e, a) { + t.Errorf("expected span names %v, got %v", e, a) + return + } +} + func TestNewRequestForProxyWithAuditID(t *testing.T) { tests := []struct { name string