From ed1610ad15f91b72017c5d69dc4f7d59a17c270f Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 20 Oct 2022 16:17:02 +0000 Subject: [PATCH] add otel tracing to latency filters --- .../endpoints/filterlatency/filterlatency.go | 12 +++-- .../filterlatency/filterlatency_test.go | 44 ++++++++++++++++++- .../src/k8s.io/apiserver/pkg/server/config.go | 10 ++--- .../apiserver/pkg/server/config_test.go | 2 + 4 files changed, 58 insertions(+), 10 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency.go index 40e32a1a190..f2bbfe54371 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency.go @@ -22,6 +22,8 @@ import ( "net/http" "time" + "go.opentelemetry.io/otel/trace" + "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server/httplog" @@ -54,8 +56,8 @@ func requestFilterRecordFrom(ctx context.Context) *requestFilterRecord { // TrackStarted measures the timestamp the given handler has started execution // by attaching a handler to the chain. -func TrackStarted(handler http.Handler, name string) http.Handler { - return trackStarted(handler, name, clock.RealClock{}) +func TrackStarted(handler http.Handler, tp trace.TracerProvider, name string) http.Handler { + return trackStarted(handler, tp, name, clock.RealClock{}) } // TrackCompleted measures the timestamp the given handler has completed execution and then @@ -70,7 +72,9 @@ func TrackCompleted(handler http.Handler) http.Handler { }) } -func trackStarted(handler http.Handler, name string, clock clock.PassiveClock) http.Handler { +func trackStarted(handler http.Handler, tp trace.TracerProvider, name string, clock clock.PassiveClock) http.Handler { + // This is a noop if the tracing is disabled, since tp will be a NoopTracerProvider + tracer := tp.Tracer("k8s.op/apiserver/pkg/endpoints/filterlatency") return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() if fr := requestFilterRecordFrom(ctx); fr != nil { @@ -85,6 +89,7 @@ func trackStarted(handler http.Handler, name string, clock clock.PassiveClock) h name: name, startedTimestamp: clock.Now(), } + ctx, _ = tracer.Start(ctx, name) r = r.WithContext(withRequestFilterRecord(ctx, fr)) handler.ServeHTTP(w, r) }) @@ -101,5 +106,6 @@ func trackCompleted(handler http.Handler, clock clock.PassiveClock, action func( if fr := requestFilterRecordFrom(ctx); fr != nil { action(ctx, fr, completedAt) } + trace.SpanFromContext(ctx).End() }) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency_test.go index db900145aa1..9ab14240e37 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency_test.go @@ -23,6 +23,10 @@ import ( "testing" "time" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" + testingclock "k8s.io/utils/clock/testing" ) @@ -41,7 +45,7 @@ func TestTrackStartedWithContextAlreadyHasFilterRecord(t *testing.T) { }) requestFilterStarted := time.Now() - wrapped := trackStarted(handler, filterName, testingclock.NewFakeClock(requestFilterStarted)) + wrapped := trackStarted(handler, trace.NewNoopTracerProvider(), filterName, testingclock.NewFakeClock(requestFilterStarted)) testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil) if err != nil { @@ -84,7 +88,7 @@ func TestTrackStartedWithContextDoesNotHaveFilterRecord(t *testing.T) { }) requestFilterStarted := time.Now() - wrapped := trackStarted(handler, filterName, testingclock.NewFakeClock(requestFilterStarted)) + wrapped := trackStarted(handler, trace.NewNoopTracerProvider(), filterName, testingclock.NewFakeClock(requestFilterStarted)) testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil) if err != nil { @@ -176,3 +180,39 @@ func TestTrackCompletedContextDoesNotHaveFilterRecord(t *testing.T) { t.Errorf("expected the callback to not be invoked, but was actually invoked %d times", actionCallCount) } } + +func TestStartedAndCompletedOpenTelemetryTracing(t *testing.T) { + filterName := "my-filter" + // Seup OTel for testing + fakeRecorder := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(fakeRecorder)) + + // base handler func + var callCount int + handler := http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) { + // we expect the handler to be invoked just once. + callCount++ + }) + // wrap with start and completed handler + wrapped := TrackCompleted(handler) + wrapped = TrackStarted(wrapped, tp, filterName) + + testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil) + if err != nil { + t.Fatalf("failed to create new http request - %v", err) + } + + wrapped.ServeHTTP(httptest.NewRecorder(), testRequest) + + if callCount != 1 { + t.Errorf("expected the given handler to be invoked once, but was actually invoked %d times", callCount) + } + output := fakeRecorder.Ended() + if len(output) != 1 { + t.Fatalf("got %d; expected len(output) == 1", len(output)) + } + span := output[0] + if span.Name() != filterName { + t.Fatalf("got %s; expected span.Name == my-filter", span.Name()) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 27aeeeef292..df0335eb87d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -819,7 +819,7 @@ func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler := filterlatency.TrackCompleted(apiHandler) handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer) - handler = filterlatency.TrackStarted(handler, "authorization") + handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authorization") if c.FlowControl != nil { workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig() @@ -827,18 +827,18 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg) handler = filterlatency.TrackCompleted(handler) handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator) - handler = filterlatency.TrackStarted(handler, "priorityandfairness") + handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness") } else { handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc) } handler = filterlatency.TrackCompleted(handler) handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer) - handler = filterlatency.TrackStarted(handler, "impersonation") + handler = filterlatency.TrackStarted(handler, c.TracerProvider, "impersonation") handler = filterlatency.TrackCompleted(handler) handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc) - handler = filterlatency.TrackStarted(handler, "audit") + handler = filterlatency.TrackStarted(handler, c.TracerProvider, "audit") failedHandler := genericapifilters.Unauthorized(c.Serializer) failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator) @@ -846,7 +846,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { failedHandler = filterlatency.TrackCompleted(failedHandler) handler = filterlatency.TrackCompleted(handler) handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences) - handler = filterlatency.TrackStarted(handler, "authentication") + handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authentication") handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index 25d547d6678..6da9f3bb683 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -40,6 +40,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/rest" + "k8s.io/component-base/tracing" netutils "k8s.io/utils/net" ) @@ -302,6 +303,7 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) { RequestTimeout: 10 * time.Second, LongRunningFunc: func(_ *http.Request, _ *request.RequestInfo) bool { return false }, lifecycleSignals: newLifecycleSignals(), + TracerProvider: tracing.NewNoopTracerProvider(), } h := DefaultBuildHandlerChain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {