diff --git a/go.mod b/go.mod index 2b1b6ea642c..a54d5824625 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.16 go.etcd.io/etcd/client/v3 v3.5.16 go.opentelemetry.io/contrib/instrumentation/github.com/emicklei/go-restful/otelrestful v0.42.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 go.opentelemetry.io/otel v1.33.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0 go.opentelemetry.io/otel/metric v1.33.0 @@ -205,7 +206,6 @@ require ( go.etcd.io/etcd/server/v3 v3.5.16 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/traces.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/traces.go index 6e36ffec862..a82edd45690 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/traces.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/traces.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/otel/trace" "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/authentication/user" tracing "k8s.io/component-base/tracing" ) @@ -31,7 +32,7 @@ import ( func WithTracing(handler http.Handler, tp trace.TracerProvider) http.Handler { opts := []otelhttp.Option{ otelhttp.WithPropagators(tracing.Propagators()), - otelhttp.WithPublicEndpoint(), + otelhttp.WithPublicEndpointFn(notSystemPrivilegedGroup), otelhttp.WithTracerProvider(tp), otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string { ctx := r.Context() @@ -43,6 +44,11 @@ func WithTracing(handler http.Handler, tp trace.TracerProvider) http.Handler { }), } wrappedHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Adjust otelhttp tracing start time to match the start time used + // for Prometheus metrics. + if startTime, ok := request.ReceivedTimestampFrom(r.Context()); ok { + r = r.WithContext(otelhttp.ContextWithStartTime(r.Context(), startTime)) + } // Add the http.target attribute to the otelhttp span // Workaround for https://github.com/open-telemetry/opentelemetry-go-contrib/issues/3743 if r.URL != nil { @@ -73,3 +79,14 @@ func getSpanNameFromRequestInfo(info *request.RequestInfo, r *http.Request) stri } return r.Method + " " + spanName } + +func notSystemPrivilegedGroup(req *http.Request) bool { + if u, ok := request.UserFrom(req.Context()); ok { + for _, group := range u.GetGroups() { + if group == user.SystemPrivilegedGroup || group == user.MonitoringGroup { + return false + } + } + } + return true +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index cdde63812d5..f2086239d65 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -1039,6 +1039,11 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { failedHandler := genericapifilters.Unauthorized(c.Serializer) failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator) + // WithTracing comes after authentication so we can allow authenticated + // clients to influence sampling. + if c.FeatureGate.Enabled(genericfeatures.APIServerTracing) { + handler = genericapifilters.WithTracing(handler, c.TracerProvider) + } failedHandler = filterlatency.TrackCompleted(failedHandler) handler = filterlatency.TrackCompleted(handler) handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences, c.Authentication.RequestHeaderConfig) @@ -1069,9 +1074,6 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled()) } handler = genericfilters.WithHTTPLogging(handler) - if c.FeatureGate.Enabled(genericfeatures.APIServerTracing) { - handler = genericapifilters.WithTracing(handler, c.TracerProvider) - } handler = genericapifilters.WithLatencyTrackers(handler) // WithRoutine will execute future handlers in a separate goroutine and serving // handler in current goroutine to minimize the stack memory usage. It must be diff --git a/test/integration/apiserver/tracing/tracing_test.go b/test/integration/apiserver/tracing/tracing_test.go index 565d8ef5cf6..973fe66ecc2 100644 --- a/test/integration/apiserver/tracing/tracing_test.go +++ b/test/integration/apiserver/tracing/tracing_test.go @@ -24,13 +24,17 @@ import ( "encoding/hex" "encoding/json" "fmt" + "math/rand" "net" + "net/http" "os" "strings" "sync" "testing" "time" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel/trace" traceservice "go.opentelemetry.io/proto/otlp/collector/trace/v1" commonv1 "go.opentelemetry.io/proto/otlp/common/v1" tracev1 "go.opentelemetry.io/proto/otlp/trace/v1" @@ -42,6 +46,7 @@ import ( "k8s.io/apimachinery/pkg/util/strategicpatch" kmsv2mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v2" client "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" utiltesting "k8s.io/client-go/util/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" @@ -85,14 +90,13 @@ resources: if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(` apiVersion: apiserver.config.k8s.io/v1beta1 kind: TracingConfiguration -samplingRatePerMillion: 1000000 endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { t.Fatal(err) } srv := grpc.NewServer() fakeServer := &traceServer{t: t} - fakeServer.resetExpectations([]*spanExpectation{}) + fakeServer.resetExpectations([]*spanExpectation{}, trace.TraceID{}) traceservice.RegisterTraceServiceServer(srv, fakeServer) go func() { @@ -122,13 +126,13 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { for _, tc := range []struct { desc string - apiCall func(client.Interface) error + apiCall func(context.Context, client.Interface) error expectedTrace []*spanExpectation }{ { desc: "create secret", - apiCall: func(c client.Interface) error { - _, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Create(context.Background(), + apiCall: func(ctx context.Context, c client.Interface) error { + _, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Create(ctx, &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "fake"}, Data: map[string][]byte{"foo": []byte("bar")}}, metav1.CreateOptions{}) return err }, @@ -151,9 +155,9 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, { desc: "get secret", - apiCall: func(c client.Interface) error { + apiCall: func(ctx context.Context, c client.Interface) error { // This depends on the "create secret" step having completed successfully - _, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Get(context.Background(), "fake", metav1.GetOptions{}) + _, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Get(ctx, "fake", metav1.GetOptions{}) return err }, expectedTrace: []*spanExpectation{ @@ -175,10 +179,11 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, } { t.Run(tc.desc, func(t *testing.T) { - fakeServer.resetExpectations(tc.expectedTrace) + ctx, traceID := sampledContext() + fakeServer.resetExpectations(tc.expectedTrace, traceID) // Make our call to the API server - if err := tc.apiCall(clientSet); err != nil { + if err := tc.apiCall(ctx, clientSet); err != nil { t.Fatal(err) } @@ -253,7 +258,7 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { } } -func TestAPIServerTracing(t *testing.T) { +func TestUnauthenticatedAPIServerTracing(t *testing.T) { // Listen for traces from the API Server before starting it, so the // API Server will successfully connect right away during the test. listener, err := net.Listen("tcp", "localhost:") @@ -270,19 +275,91 @@ func TestAPIServerTracing(t *testing.T) { if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(` apiVersion: apiserver.config.k8s.io/v1beta1 kind: TracingConfiguration -samplingRatePerMillion: 1000000 endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { t.Fatal(err) } srv := grpc.NewServer() fakeServer := &traceServer{t: t} - fakeServer.resetExpectations([]*spanExpectation{}) + fakeServer.resetExpectations([]*spanExpectation{{}}, trace.TraceID{}) traceservice.RegisterTraceServiceServer(srv, fakeServer) go srv.Serve(listener) defer srv.Stop() + // Start the API Server with our tracing configuration + testServer := kubeapiservertesting.StartTestServerOrDie(t, + kubeapiservertesting.NewDefaultTestServerOptions(), + []string{"--tracing-config-file=" + tracingConfigFile.Name()}, + framework.SharedEtcd(), + ) + defer testServer.TearDownFn() + + ctx, testTraceID := sampledContext() + // Match any span that has the tests' Trace ID + fakeServer.resetExpectations([]*spanExpectation{{}}, testTraceID) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, testServer.ClientConfig.Host+"/healthz", nil) + if err != nil { + t.Fatal(err) + } + unauthenticatedConfig := rest.CopyConfig(testServer.ClientConfig) + // Remove the bearer token from the request to make it unauthenticated. + unauthenticatedConfig.BearerToken = "" + transport, err := rest.TransportFor(unauthenticatedConfig) + if err != nil { + t.Fatal(err) + } + client := &http.Client{Transport: otelhttp.NewTransport(transport)} + if _, err = client.Do(req); err != nil { + t.Fatal(err) + } + + // Ensure we do not find any matching traces, since the request was not authenticated + select { + case <-fakeServer.traceFound: + t.Fatal("Found a trace when none was expected") + case <-time.After(10 * time.Second): + } +} + +func TestAPIServerTracing(t *testing.T) { + // Listen for traces from the API Server before starting it, so the + // API Server will successfully connect right away during the test. + listener, err := net.Listen("tcp", "localhost:") + if err != nil { + t.Fatal(err) + } + // Write the configuration for tracing to a file + tracingConfigFile, err := os.CreateTemp("", "tracing-config.yaml") + if err != nil { + t.Fatal(err) + } + defer func() { + if err = os.Remove(tracingConfigFile.Name()); err != nil { + t.Error(err) + } + }() + + if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(` +apiVersion: apiserver.config.k8s.io/v1beta1 +kind: TracingConfiguration +endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { + t.Fatal(err) + } + + srv := grpc.NewServer() + fakeServer := &traceServer{t: t} + fakeServer.resetExpectations([]*spanExpectation{}, trace.TraceID{}) + traceservice.RegisterTraceServiceServer(srv, fakeServer) + + go func() { + if err = srv.Serve(listener); err != nil { + t.Error(err) + } + }() + defer srv.Stop() + // Start the API Server with our tracing configuration testServer := kubeapiservertesting.StartTestServerOrDie(t, kubeapiservertesting.NewDefaultTestServerOptions(), @@ -297,13 +374,13 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { for _, tc := range []struct { desc string - apiCall func(*client.Clientset) error + apiCall func(context.Context) error expectedTrace []*spanExpectation }{ { desc: "create node", - apiCall: func(c *client.Clientset) error { - _, err = clientSet.CoreV1().Nodes().Create(context.Background(), + apiCall: func(ctx context.Context) error { + _, err = clientSet.CoreV1().Nodes().Create(ctx, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "fake"}}, metav1.CreateOptions{}) return err }, @@ -322,9 +399,6 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, }, }, - { - name: "authentication", - }, { name: "Create", attributes: map[string]func(*commonv1.AnyValue) bool{ @@ -421,9 +495,9 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, { desc: "get node", - apiCall: func(c *client.Clientset) error { + apiCall: func(ctx context.Context) error { // This depends on the "create node" step having completed successfully - _, err = clientSet.CoreV1().Nodes().Get(context.Background(), "fake", metav1.GetOptions{}) + _, err = clientSet.CoreV1().Nodes().Get(ctx, "fake", metav1.GetOptions{}) return err }, expectedTrace: []*spanExpectation{ @@ -441,9 +515,6 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, }, }, - { - name: "authentication", - }, { name: "Get", attributes: map[string]func(*commonv1.AnyValue) bool{ @@ -529,8 +600,8 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, { desc: "list nodes", - apiCall: func(c *client.Clientset) error { - _, err = clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + apiCall: func(ctx context.Context) error { + _, err = clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) return err }, expectedTrace: []*spanExpectation{ @@ -548,9 +619,6 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, }, }, - { - name: "authentication", - }, { name: "List", attributes: map[string]func(*commonv1.AnyValue) bool{ @@ -580,20 +648,13 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, }, { - name: "cacher.GetList", + name: "etcdserverpb.KV/Range", attributes: map[string]func(*commonv1.AnyValue) bool{ - "audit-id": func(v *commonv1.AnyValue) bool { - return v.GetStringValue() != "" - }, - "type": func(v *commonv1.AnyValue) bool { - return v.GetStringValue() == "nodes" + "rpc.system": func(v *commonv1.AnyValue) bool { + return v.GetStringValue() == "grpc" }, }, - events: []string{ - "Ready", - "Listed items from cache", - "Filtered items", - }, + events: []string{"message"}, }, { name: "SerializeObject", @@ -626,9 +687,9 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, { desc: "update node", - apiCall: func(c *client.Clientset) error { + apiCall: func(ctx context.Context) error { // This depends on the "create node" step having completed successfully - _, err = clientSet.CoreV1().Nodes().Update(context.Background(), + _, err = clientSet.CoreV1().Nodes().Update(ctx, &v1.Node{ObjectMeta: metav1.ObjectMeta{ Name: "fake", Annotations: map[string]string{"foo": "bar"}, @@ -650,9 +711,6 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, }, }, - { - name: "authentication", - }, { name: "Update", attributes: map[string]func(*commonv1.AnyValue) bool{ @@ -752,7 +810,7 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, { desc: "patch node", - apiCall: func(c *client.Clientset) error { + apiCall: func(ctx context.Context) error { // This depends on the "create node" step having completed successfully oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{ Name: "fake", @@ -776,7 +834,7 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { if err != nil { return err } - _, err = clientSet.CoreV1().Nodes().Patch(context.Background(), "fake", types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + _, err = clientSet.CoreV1().Nodes().Patch(ctx, "fake", types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) return err }, expectedTrace: []*spanExpectation{ @@ -794,9 +852,6 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, }, }, - { - name: "authentication", - }, { name: "Patch", attributes: map[string]func(*commonv1.AnyValue) bool{ @@ -896,9 +951,9 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, { desc: "delete node", - apiCall: func(c *client.Clientset) error { + apiCall: func(ctx context.Context) error { // This depends on the "create node" step having completed successfully - return clientSet.CoreV1().Nodes().Delete(context.Background(), "fake", metav1.DeleteOptions{}) + return clientSet.CoreV1().Nodes().Delete(ctx, "fake", metav1.DeleteOptions{}) }, expectedTrace: []*spanExpectation{ { @@ -915,9 +970,6 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, }, }, - { - name: "authentication", - }, { name: "Delete", attributes: map[string]func(*commonv1.AnyValue) bool{ @@ -990,10 +1042,11 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { }, } { t.Run(tc.desc, func(t *testing.T) { - fakeServer.resetExpectations(tc.expectedTrace) + ctx, testTraceID := sampledContext() + fakeServer.resetExpectations(tc.expectedTrace, testTraceID) // Make our call to the API server - if err := tc.apiCall(clientSet); err != nil { + if err := tc.apiCall(ctx); err != nil { t.Fatal(err) } @@ -1001,6 +1054,11 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { select { case <-fakeServer.traceFound: case <-time.After(30 * time.Second): + for _, spanExpectation := range fakeServer.expectations { + if !spanExpectation.met { + t.Logf("Unmet expectation: %s", spanExpectation.name) + } + } t.Fatal("Timed out waiting for trace") } }) @@ -1016,13 +1074,14 @@ type traceServer struct { lock sync.Mutex traceFound chan struct{} expectations traceExpectation + testTraceID trace.TraceID } func (t *traceServer) Export(ctx context.Context, req *traceservice.ExportTraceServiceRequest) (*traceservice.ExportTraceServiceResponse, error) { t.lock.Lock() defer t.lock.Unlock() - t.expectations.update(req) + t.expectations.update(req, t.testTraceID) // if all expectations are met, notify the test scenario by closing traceFound if t.expectations.met() { select { @@ -1037,11 +1096,12 @@ func (t *traceServer) Export(ctx context.Context, req *traceservice.ExportTraceS // resetExpectations is used by a new test scenario to set new expectations for // the test server. -func (t *traceServer) resetExpectations(newExpectations traceExpectation) { +func (t *traceServer) resetExpectations(newExpectations traceExpectation, traceID trace.TraceID) { t.lock.Lock() defer t.lock.Unlock() t.traceFound = make(chan struct{}) t.expectations = newExpectations + t.testTraceID = traceID } // traceExpectation is an expectation for an entire trace @@ -1050,25 +1110,8 @@ type traceExpectation []*spanExpectation // met returns true if all span expectations the server is looking for have // been satisfied. func (t traceExpectation) met() bool { - if len(t) == 0 { - return true - } - // we want to find any trace ID which all span IDs contain. - // try each trace ID met by the first span. - possibleTraceIDs := t[0].metTraceIDs - for _, tid := range possibleTraceIDs { - if t.contains(tid) { - return true - } - } - return false -} - -// contains returns true if the all spans in the trace expectation contain the -// trace ID -func (t traceExpectation) contains(checkTID string) bool { - for _, expectation := range t { - if !expectation.contains(checkTID) { + for _, se := range t { + if !se.met { return false } } @@ -1077,20 +1120,23 @@ func (t traceExpectation) contains(checkTID string) bool { // update finds all expectations that are met by a span in the // incoming request. -func (t traceExpectation) update(req *traceservice.ExportTraceServiceRequest) { +func (t traceExpectation) update(req *traceservice.ExportTraceServiceRequest, traceID trace.TraceID) { for _, resourceSpans := range req.GetResourceSpans() { for _, instrumentationSpans := range resourceSpans.GetScopeSpans() { for _, span := range instrumentationSpans.GetSpans() { - t.updateForSpan(span) + t.updateForSpan(span, traceID) } } } } // updateForSpan updates expectations based on a single incoming span. -func (t traceExpectation) updateForSpan(span *tracev1.Span) { +func (t traceExpectation) updateForSpan(span *tracev1.Span, traceID trace.TraceID) { + if hex.EncodeToString(span.TraceId) != traceID.String() { + return + } for i, spanExpectation := range t { - if span.Name != spanExpectation.name { + if spanExpectation.name != "" && span.Name != spanExpectation.name { continue } if !spanExpectation.attributes.matches(span.GetAttributes()) { @@ -1099,7 +1145,7 @@ func (t traceExpectation) updateForSpan(span *tracev1.Span) { if !spanExpectation.events.matches(span.GetEvents()) { continue } - t[i].metTraceIDs = append(spanExpectation.metTraceIDs, hex.EncodeToString(span.TraceId[:])) + t[i].met = true } } @@ -1109,18 +1155,7 @@ type spanExpectation struct { name string attributes attributeExpectation events eventExpectation - // For each trace ID that meets this expectation, record it here. - // This way, we can ensure that all spans that should be in the same trace have the same trace ID - metTraceIDs []string -} - -func (s *spanExpectation) contains(tid string) bool { - for _, metTID := range s.metTraceIDs { - if tid == metTID { - return true - } - } - return false + met bool } // eventExpectation is the expectation for an event attached to a span. @@ -1158,3 +1193,18 @@ func (a attributeExpectation) matches(attrs []*commonv1.KeyValue) bool { } return true } + +var r = rand.New(rand.NewSource(time.Now().UnixNano())) + +func sampledContext() (context.Context, trace.TraceID) { + tid := trace.TraceID{} + _, _ = r.Read(tid[:]) + sid := trace.SpanID{} + _, _ = r.Read(sid[:]) + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: tid, + SpanID: sid, + TraceFlags: trace.FlagsSampled, + }) + return trace.ContextWithSpanContext(context.Background(), sc), tid +}