mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
add otel tracing to latency filters
This commit is contained in:
parent
d86c013b0d
commit
ed1610ad15
@ -22,6 +22,8 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
"k8s.io/apiserver/pkg/endpoints/metrics"
|
"k8s.io/apiserver/pkg/endpoints/metrics"
|
||||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/apiserver/pkg/server/httplog"
|
"k8s.io/apiserver/pkg/server/httplog"
|
||||||
@ -54,8 +56,8 @@ func requestFilterRecordFrom(ctx context.Context) *requestFilterRecord {
|
|||||||
|
|
||||||
// TrackStarted measures the timestamp the given handler has started execution
|
// TrackStarted measures the timestamp the given handler has started execution
|
||||||
// by attaching a handler to the chain.
|
// by attaching a handler to the chain.
|
||||||
func TrackStarted(handler http.Handler, name string) http.Handler {
|
func TrackStarted(handler http.Handler, tp trace.TracerProvider, name string) http.Handler {
|
||||||
return trackStarted(handler, name, clock.RealClock{})
|
return trackStarted(handler, tp, name, clock.RealClock{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// TrackCompleted measures the timestamp the given handler has completed execution and then
|
// TrackCompleted measures the timestamp the given handler has completed execution and then
|
||||||
@ -70,7 +72,9 @@ func TrackCompleted(handler http.Handler) http.Handler {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func trackStarted(handler http.Handler, name string, clock clock.PassiveClock) http.Handler {
|
func trackStarted(handler http.Handler, tp trace.TracerProvider, name string, clock clock.PassiveClock) http.Handler {
|
||||||
|
// This is a noop if the tracing is disabled, since tp will be a NoopTracerProvider
|
||||||
|
tracer := tp.Tracer("k8s.op/apiserver/pkg/endpoints/filterlatency")
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
if fr := requestFilterRecordFrom(ctx); fr != nil {
|
if fr := requestFilterRecordFrom(ctx); fr != nil {
|
||||||
@ -85,6 +89,7 @@ func trackStarted(handler http.Handler, name string, clock clock.PassiveClock) h
|
|||||||
name: name,
|
name: name,
|
||||||
startedTimestamp: clock.Now(),
|
startedTimestamp: clock.Now(),
|
||||||
}
|
}
|
||||||
|
ctx, _ = tracer.Start(ctx, name)
|
||||||
r = r.WithContext(withRequestFilterRecord(ctx, fr))
|
r = r.WithContext(withRequestFilterRecord(ctx, fr))
|
||||||
handler.ServeHTTP(w, r)
|
handler.ServeHTTP(w, r)
|
||||||
})
|
})
|
||||||
@ -101,5 +106,6 @@ func trackCompleted(handler http.Handler, clock clock.PassiveClock, action func(
|
|||||||
if fr := requestFilterRecordFrom(ctx); fr != nil {
|
if fr := requestFilterRecordFrom(ctx); fr != nil {
|
||||||
action(ctx, fr, completedAt)
|
action(ctx, fr, completedAt)
|
||||||
}
|
}
|
||||||
|
trace.SpanFromContext(ctx).End()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,10 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||||
|
"go.opentelemetry.io/otel/sdk/trace/tracetest"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
testingclock "k8s.io/utils/clock/testing"
|
testingclock "k8s.io/utils/clock/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -41,7 +45,7 @@ func TestTrackStartedWithContextAlreadyHasFilterRecord(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
requestFilterStarted := time.Now()
|
requestFilterStarted := time.Now()
|
||||||
wrapped := trackStarted(handler, filterName, testingclock.NewFakeClock(requestFilterStarted))
|
wrapped := trackStarted(handler, trace.NewNoopTracerProvider(), filterName, testingclock.NewFakeClock(requestFilterStarted))
|
||||||
|
|
||||||
testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil)
|
testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -84,7 +88,7 @@ func TestTrackStartedWithContextDoesNotHaveFilterRecord(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
requestFilterStarted := time.Now()
|
requestFilterStarted := time.Now()
|
||||||
wrapped := trackStarted(handler, filterName, testingclock.NewFakeClock(requestFilterStarted))
|
wrapped := trackStarted(handler, trace.NewNoopTracerProvider(), filterName, testingclock.NewFakeClock(requestFilterStarted))
|
||||||
|
|
||||||
testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil)
|
testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -176,3 +180,39 @@ func TestTrackCompletedContextDoesNotHaveFilterRecord(t *testing.T) {
|
|||||||
t.Errorf("expected the callback to not be invoked, but was actually invoked %d times", actionCallCount)
|
t.Errorf("expected the callback to not be invoked, but was actually invoked %d times", actionCallCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStartedAndCompletedOpenTelemetryTracing(t *testing.T) {
|
||||||
|
filterName := "my-filter"
|
||||||
|
// Seup OTel for testing
|
||||||
|
fakeRecorder := tracetest.NewSpanRecorder()
|
||||||
|
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(fakeRecorder))
|
||||||
|
|
||||||
|
// base handler func
|
||||||
|
var callCount int
|
||||||
|
handler := http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {
|
||||||
|
// we expect the handler to be invoked just once.
|
||||||
|
callCount++
|
||||||
|
})
|
||||||
|
// wrap with start and completed handler
|
||||||
|
wrapped := TrackCompleted(handler)
|
||||||
|
wrapped = TrackStarted(wrapped, tp, filterName)
|
||||||
|
|
||||||
|
testRequest, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create new http request - %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wrapped.ServeHTTP(httptest.NewRecorder(), testRequest)
|
||||||
|
|
||||||
|
if callCount != 1 {
|
||||||
|
t.Errorf("expected the given handler to be invoked once, but was actually invoked %d times", callCount)
|
||||||
|
}
|
||||||
|
output := fakeRecorder.Ended()
|
||||||
|
if len(output) != 1 {
|
||||||
|
t.Fatalf("got %d; expected len(output) == 1", len(output))
|
||||||
|
}
|
||||||
|
span := output[0]
|
||||||
|
if span.Name() != filterName {
|
||||||
|
t.Fatalf("got %s; expected span.Name == my-filter", span.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -819,7 +819,7 @@ func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c
|
|||||||
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
||||||
handler := filterlatency.TrackCompleted(apiHandler)
|
handler := filterlatency.TrackCompleted(apiHandler)
|
||||||
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
|
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
|
||||||
handler = filterlatency.TrackStarted(handler, "authorization")
|
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authorization")
|
||||||
|
|
||||||
if c.FlowControl != nil {
|
if c.FlowControl != nil {
|
||||||
workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
|
workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
|
||||||
@ -827,18 +827,18 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
|||||||
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg)
|
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg)
|
||||||
handler = filterlatency.TrackCompleted(handler)
|
handler = filterlatency.TrackCompleted(handler)
|
||||||
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
|
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
|
||||||
handler = filterlatency.TrackStarted(handler, "priorityandfairness")
|
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness")
|
||||||
} else {
|
} else {
|
||||||
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
|
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
|
||||||
}
|
}
|
||||||
|
|
||||||
handler = filterlatency.TrackCompleted(handler)
|
handler = filterlatency.TrackCompleted(handler)
|
||||||
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
|
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
|
||||||
handler = filterlatency.TrackStarted(handler, "impersonation")
|
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "impersonation")
|
||||||
|
|
||||||
handler = filterlatency.TrackCompleted(handler)
|
handler = filterlatency.TrackCompleted(handler)
|
||||||
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
|
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
|
||||||
handler = filterlatency.TrackStarted(handler, "audit")
|
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "audit")
|
||||||
|
|
||||||
failedHandler := genericapifilters.Unauthorized(c.Serializer)
|
failedHandler := genericapifilters.Unauthorized(c.Serializer)
|
||||||
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
|
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
|
||||||
@ -846,7 +846,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
|||||||
failedHandler = filterlatency.TrackCompleted(failedHandler)
|
failedHandler = filterlatency.TrackCompleted(failedHandler)
|
||||||
handler = filterlatency.TrackCompleted(handler)
|
handler = filterlatency.TrackCompleted(handler)
|
||||||
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
|
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
|
||||||
handler = filterlatency.TrackStarted(handler, "authentication")
|
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authentication")
|
||||||
|
|
||||||
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
|
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
|
||||||
|
|
||||||
|
@ -40,6 +40,7 @@ import (
|
|||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
|
"k8s.io/component-base/tracing"
|
||||||
netutils "k8s.io/utils/net"
|
netutils "k8s.io/utils/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -302,6 +303,7 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) {
|
|||||||
RequestTimeout: 10 * time.Second,
|
RequestTimeout: 10 * time.Second,
|
||||||
LongRunningFunc: func(_ *http.Request, _ *request.RequestInfo) bool { return false },
|
LongRunningFunc: func(_ *http.Request, _ *request.RequestInfo) bool { return false },
|
||||||
lifecycleSignals: newLifecycleSignals(),
|
lifecycleSignals: newLifecycleSignals(),
|
||||||
|
TracerProvider: tracing.NewNoopTracerProvider(),
|
||||||
}
|
}
|
||||||
|
|
||||||
h := DefaultBuildHandlerChain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
h := DefaultBuildHandlerChain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
Loading…
Reference in New Issue
Block a user