Merge pull request #127053 from dashpole/tracing_context_propagation

APIServerTracing: Respect trace context only for privileged users
This commit is contained in:
Kubernetes Prow Robot 2025-03-20 17:10:31 -07:00 committed by GitHub
commit b2b6c4d023
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 168 additions and 99 deletions

2
go.mod
View File

@ -63,6 +63,7 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.16
go.etcd.io/etcd/client/v3 v3.5.16
go.opentelemetry.io/contrib/instrumentation/github.com/emicklei/go-restful/otelrestful v0.42.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0
go.opentelemetry.io/otel v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0
go.opentelemetry.io/otel/metric v1.33.0
@ -205,7 +206,6 @@ require (
go.etcd.io/etcd/server/v3 v3.5.16 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect

View File

@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/otel/trace"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/authentication/user"
tracing "k8s.io/component-base/tracing"
)
@ -31,7 +32,7 @@ import (
func WithTracing(handler http.Handler, tp trace.TracerProvider) http.Handler {
opts := []otelhttp.Option{
otelhttp.WithPropagators(tracing.Propagators()),
otelhttp.WithPublicEndpoint(),
otelhttp.WithPublicEndpointFn(notSystemPrivilegedGroup),
otelhttp.WithTracerProvider(tp),
otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string {
ctx := r.Context()
@ -43,6 +44,11 @@ func WithTracing(handler http.Handler, tp trace.TracerProvider) http.Handler {
}),
}
wrappedHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Adjust otelhttp tracing start time to match the start time used
// for Prometheus metrics.
if startTime, ok := request.ReceivedTimestampFrom(r.Context()); ok {
r = r.WithContext(otelhttp.ContextWithStartTime(r.Context(), startTime))
}
// Add the http.target attribute to the otelhttp span
// Workaround for https://github.com/open-telemetry/opentelemetry-go-contrib/issues/3743
if r.URL != nil {
@ -73,3 +79,14 @@ func getSpanNameFromRequestInfo(info *request.RequestInfo, r *http.Request) stri
}
return r.Method + " " + spanName
}
func notSystemPrivilegedGroup(req *http.Request) bool {
if u, ok := request.UserFrom(req.Context()); ok {
for _, group := range u.GetGroups() {
if group == user.SystemPrivilegedGroup || group == user.MonitoringGroup {
return false
}
}
}
return true
}

View File

@ -1039,6 +1039,11 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
failedHandler := genericapifilters.Unauthorized(c.Serializer)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
// WithTracing comes after authentication so we can allow authenticated
// clients to influence sampling.
if c.FeatureGate.Enabled(genericfeatures.APIServerTracing) {
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
}
failedHandler = filterlatency.TrackCompleted(failedHandler)
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences, c.Authentication.RequestHeaderConfig)
@ -1069,9 +1074,6 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled())
}
handler = genericfilters.WithHTTPLogging(handler)
if c.FeatureGate.Enabled(genericfeatures.APIServerTracing) {
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
}
handler = genericapifilters.WithLatencyTrackers(handler)
// WithRoutine will execute future handlers in a separate goroutine and serving
// handler in current goroutine to minimize the stack memory usage. It must be

View File

@ -24,13 +24,17 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"math/rand"
"net"
"net/http"
"os"
"strings"
"sync"
"testing"
"time"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/trace"
traceservice "go.opentelemetry.io/proto/otlp/collector/trace/v1"
commonv1 "go.opentelemetry.io/proto/otlp/common/v1"
tracev1 "go.opentelemetry.io/proto/otlp/trace/v1"
@ -42,6 +46,7 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch"
kmsv2mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v2"
client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
utiltesting "k8s.io/client-go/util/testing"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
@ -85,14 +90,13 @@ resources:
if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(`
apiVersion: apiserver.config.k8s.io/v1beta1
kind: TracingConfiguration
samplingRatePerMillion: 1000000
endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
t.Fatal(err)
}
srv := grpc.NewServer()
fakeServer := &traceServer{t: t}
fakeServer.resetExpectations([]*spanExpectation{})
fakeServer.resetExpectations([]*spanExpectation{}, trace.TraceID{})
traceservice.RegisterTraceServiceServer(srv, fakeServer)
go func() {
@ -122,13 +126,13 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
for _, tc := range []struct {
desc string
apiCall func(client.Interface) error
apiCall func(context.Context, client.Interface) error
expectedTrace []*spanExpectation
}{
{
desc: "create secret",
apiCall: func(c client.Interface) error {
_, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Create(context.Background(),
apiCall: func(ctx context.Context, c client.Interface) error {
_, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Create(ctx,
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "fake"}, Data: map[string][]byte{"foo": []byte("bar")}}, metav1.CreateOptions{})
return err
},
@ -151,9 +155,9 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
{
desc: "get secret",
apiCall: func(c client.Interface) error {
apiCall: func(ctx context.Context, c client.Interface) error {
// This depends on the "create secret" step having completed successfully
_, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Get(context.Background(), "fake", metav1.GetOptions{})
_, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Get(ctx, "fake", metav1.GetOptions{})
return err
},
expectedTrace: []*spanExpectation{
@ -175,10 +179,11 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
} {
t.Run(tc.desc, func(t *testing.T) {
fakeServer.resetExpectations(tc.expectedTrace)
ctx, traceID := sampledContext()
fakeServer.resetExpectations(tc.expectedTrace, traceID)
// Make our call to the API server
if err := tc.apiCall(clientSet); err != nil {
if err := tc.apiCall(ctx, clientSet); err != nil {
t.Fatal(err)
}
@ -253,7 +258,7 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
}
}
func TestAPIServerTracing(t *testing.T) {
func TestUnauthenticatedAPIServerTracing(t *testing.T) {
// Listen for traces from the API Server before starting it, so the
// API Server will successfully connect right away during the test.
listener, err := net.Listen("tcp", "localhost:")
@ -270,19 +275,91 @@ func TestAPIServerTracing(t *testing.T) {
if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(`
apiVersion: apiserver.config.k8s.io/v1beta1
kind: TracingConfiguration
samplingRatePerMillion: 1000000
endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
t.Fatal(err)
}
srv := grpc.NewServer()
fakeServer := &traceServer{t: t}
fakeServer.resetExpectations([]*spanExpectation{})
fakeServer.resetExpectations([]*spanExpectation{{}}, trace.TraceID{})
traceservice.RegisterTraceServiceServer(srv, fakeServer)
go srv.Serve(listener)
defer srv.Stop()
// Start the API Server with our tracing configuration
testServer := kubeapiservertesting.StartTestServerOrDie(t,
kubeapiservertesting.NewDefaultTestServerOptions(),
[]string{"--tracing-config-file=" + tracingConfigFile.Name()},
framework.SharedEtcd(),
)
defer testServer.TearDownFn()
ctx, testTraceID := sampledContext()
// Match any span that has the tests' Trace ID
fakeServer.resetExpectations([]*spanExpectation{{}}, testTraceID)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, testServer.ClientConfig.Host+"/healthz", nil)
if err != nil {
t.Fatal(err)
}
unauthenticatedConfig := rest.CopyConfig(testServer.ClientConfig)
// Remove the bearer token from the request to make it unauthenticated.
unauthenticatedConfig.BearerToken = ""
transport, err := rest.TransportFor(unauthenticatedConfig)
if err != nil {
t.Fatal(err)
}
client := &http.Client{Transport: otelhttp.NewTransport(transport)}
if _, err = client.Do(req); err != nil {
t.Fatal(err)
}
// Ensure we do not find any matching traces, since the request was not authenticated
select {
case <-fakeServer.traceFound:
t.Fatal("Found a trace when none was expected")
case <-time.After(10 * time.Second):
}
}
func TestAPIServerTracing(t *testing.T) {
// Listen for traces from the API Server before starting it, so the
// API Server will successfully connect right away during the test.
listener, err := net.Listen("tcp", "localhost:")
if err != nil {
t.Fatal(err)
}
// Write the configuration for tracing to a file
tracingConfigFile, err := os.CreateTemp("", "tracing-config.yaml")
if err != nil {
t.Fatal(err)
}
defer func() {
if err = os.Remove(tracingConfigFile.Name()); err != nil {
t.Error(err)
}
}()
if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(`
apiVersion: apiserver.config.k8s.io/v1beta1
kind: TracingConfiguration
endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
t.Fatal(err)
}
srv := grpc.NewServer()
fakeServer := &traceServer{t: t}
fakeServer.resetExpectations([]*spanExpectation{}, trace.TraceID{})
traceservice.RegisterTraceServiceServer(srv, fakeServer)
go func() {
if err = srv.Serve(listener); err != nil {
t.Error(err)
}
}()
defer srv.Stop()
// Start the API Server with our tracing configuration
testServer := kubeapiservertesting.StartTestServerOrDie(t,
kubeapiservertesting.NewDefaultTestServerOptions(),
@ -297,13 +374,13 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
for _, tc := range []struct {
desc string
apiCall func(*client.Clientset) error
apiCall func(context.Context) error
expectedTrace []*spanExpectation
}{
{
desc: "create node",
apiCall: func(c *client.Clientset) error {
_, err = clientSet.CoreV1().Nodes().Create(context.Background(),
apiCall: func(ctx context.Context) error {
_, err = clientSet.CoreV1().Nodes().Create(ctx,
&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "fake"}}, metav1.CreateOptions{})
return err
},
@ -322,9 +399,6 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
},
},
{
name: "authentication",
},
{
name: "Create",
attributes: map[string]func(*commonv1.AnyValue) bool{
@ -421,9 +495,9 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
{
desc: "get node",
apiCall: func(c *client.Clientset) error {
apiCall: func(ctx context.Context) error {
// This depends on the "create node" step having completed successfully
_, err = clientSet.CoreV1().Nodes().Get(context.Background(), "fake", metav1.GetOptions{})
_, err = clientSet.CoreV1().Nodes().Get(ctx, "fake", metav1.GetOptions{})
return err
},
expectedTrace: []*spanExpectation{
@ -441,9 +515,6 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
},
},
{
name: "authentication",
},
{
name: "Get",
attributes: map[string]func(*commonv1.AnyValue) bool{
@ -529,8 +600,8 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
{
desc: "list nodes",
apiCall: func(c *client.Clientset) error {
_, err = clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
apiCall: func(ctx context.Context) error {
_, err = clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
return err
},
expectedTrace: []*spanExpectation{
@ -548,9 +619,6 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
},
},
{
name: "authentication",
},
{
name: "List",
attributes: map[string]func(*commonv1.AnyValue) bool{
@ -580,20 +648,13 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
},
{
name: "cacher.GetList",
name: "etcdserverpb.KV/Range",
attributes: map[string]func(*commonv1.AnyValue) bool{
"audit-id": func(v *commonv1.AnyValue) bool {
return v.GetStringValue() != ""
},
"type": func(v *commonv1.AnyValue) bool {
return v.GetStringValue() == "nodes"
"rpc.system": func(v *commonv1.AnyValue) bool {
return v.GetStringValue() == "grpc"
},
},
events: []string{
"Ready",
"Listed items from cache",
"Filtered items",
},
events: []string{"message"},
},
{
name: "SerializeObject",
@ -626,9 +687,9 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
{
desc: "update node",
apiCall: func(c *client.Clientset) error {
apiCall: func(ctx context.Context) error {
// This depends on the "create node" step having completed successfully
_, err = clientSet.CoreV1().Nodes().Update(context.Background(),
_, err = clientSet.CoreV1().Nodes().Update(ctx,
&v1.Node{ObjectMeta: metav1.ObjectMeta{
Name: "fake",
Annotations: map[string]string{"foo": "bar"},
@ -650,9 +711,6 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
},
},
{
name: "authentication",
},
{
name: "Update",
attributes: map[string]func(*commonv1.AnyValue) bool{
@ -752,7 +810,7 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
{
desc: "patch node",
apiCall: func(c *client.Clientset) error {
apiCall: func(ctx context.Context) error {
// This depends on the "create node" step having completed successfully
oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{
Name: "fake",
@ -776,7 +834,7 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
if err != nil {
return err
}
_, err = clientSet.CoreV1().Nodes().Patch(context.Background(), "fake", types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
_, err = clientSet.CoreV1().Nodes().Patch(ctx, "fake", types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
return err
},
expectedTrace: []*spanExpectation{
@ -794,9 +852,6 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
},
},
{
name: "authentication",
},
{
name: "Patch",
attributes: map[string]func(*commonv1.AnyValue) bool{
@ -896,9 +951,9 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
{
desc: "delete node",
apiCall: func(c *client.Clientset) error {
apiCall: func(ctx context.Context) error {
// This depends on the "create node" step having completed successfully
return clientSet.CoreV1().Nodes().Delete(context.Background(), "fake", metav1.DeleteOptions{})
return clientSet.CoreV1().Nodes().Delete(ctx, "fake", metav1.DeleteOptions{})
},
expectedTrace: []*spanExpectation{
{
@ -915,9 +970,6 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
},
},
{
name: "authentication",
},
{
name: "Delete",
attributes: map[string]func(*commonv1.AnyValue) bool{
@ -990,10 +1042,11 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
},
} {
t.Run(tc.desc, func(t *testing.T) {
fakeServer.resetExpectations(tc.expectedTrace)
ctx, testTraceID := sampledContext()
fakeServer.resetExpectations(tc.expectedTrace, testTraceID)
// Make our call to the API server
if err := tc.apiCall(clientSet); err != nil {
if err := tc.apiCall(ctx); err != nil {
t.Fatal(err)
}
@ -1001,6 +1054,11 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
select {
case <-fakeServer.traceFound:
case <-time.After(30 * time.Second):
for _, spanExpectation := range fakeServer.expectations {
if !spanExpectation.met {
t.Logf("Unmet expectation: %s", spanExpectation.name)
}
}
t.Fatal("Timed out waiting for trace")
}
})
@ -1016,13 +1074,14 @@ type traceServer struct {
lock sync.Mutex
traceFound chan struct{}
expectations traceExpectation
testTraceID trace.TraceID
}
func (t *traceServer) Export(ctx context.Context, req *traceservice.ExportTraceServiceRequest) (*traceservice.ExportTraceServiceResponse, error) {
t.lock.Lock()
defer t.lock.Unlock()
t.expectations.update(req)
t.expectations.update(req, t.testTraceID)
// if all expectations are met, notify the test scenario by closing traceFound
if t.expectations.met() {
select {
@ -1037,11 +1096,12 @@ func (t *traceServer) Export(ctx context.Context, req *traceservice.ExportTraceS
// resetExpectations is used by a new test scenario to set new expectations for
// the test server.
func (t *traceServer) resetExpectations(newExpectations traceExpectation) {
func (t *traceServer) resetExpectations(newExpectations traceExpectation, traceID trace.TraceID) {
t.lock.Lock()
defer t.lock.Unlock()
t.traceFound = make(chan struct{})
t.expectations = newExpectations
t.testTraceID = traceID
}
// traceExpectation is an expectation for an entire trace
@ -1050,25 +1110,8 @@ type traceExpectation []*spanExpectation
// met returns true if all span expectations the server is looking for have
// been satisfied.
func (t traceExpectation) met() bool {
if len(t) == 0 {
return true
}
// we want to find any trace ID which all span IDs contain.
// try each trace ID met by the first span.
possibleTraceIDs := t[0].metTraceIDs
for _, tid := range possibleTraceIDs {
if t.contains(tid) {
return true
}
}
return false
}
// contains returns true if the all spans in the trace expectation contain the
// trace ID
func (t traceExpectation) contains(checkTID string) bool {
for _, expectation := range t {
if !expectation.contains(checkTID) {
for _, se := range t {
if !se.met {
return false
}
}
@ -1077,20 +1120,23 @@ func (t traceExpectation) contains(checkTID string) bool {
// update finds all expectations that are met by a span in the
// incoming request.
func (t traceExpectation) update(req *traceservice.ExportTraceServiceRequest) {
func (t traceExpectation) update(req *traceservice.ExportTraceServiceRequest, traceID trace.TraceID) {
for _, resourceSpans := range req.GetResourceSpans() {
for _, instrumentationSpans := range resourceSpans.GetScopeSpans() {
for _, span := range instrumentationSpans.GetSpans() {
t.updateForSpan(span)
t.updateForSpan(span, traceID)
}
}
}
}
// updateForSpan updates expectations based on a single incoming span.
func (t traceExpectation) updateForSpan(span *tracev1.Span) {
func (t traceExpectation) updateForSpan(span *tracev1.Span, traceID trace.TraceID) {
if hex.EncodeToString(span.TraceId) != traceID.String() {
return
}
for i, spanExpectation := range t {
if span.Name != spanExpectation.name {
if spanExpectation.name != "" && span.Name != spanExpectation.name {
continue
}
if !spanExpectation.attributes.matches(span.GetAttributes()) {
@ -1099,7 +1145,7 @@ func (t traceExpectation) updateForSpan(span *tracev1.Span) {
if !spanExpectation.events.matches(span.GetEvents()) {
continue
}
t[i].metTraceIDs = append(spanExpectation.metTraceIDs, hex.EncodeToString(span.TraceId[:]))
t[i].met = true
}
}
@ -1109,18 +1155,7 @@ type spanExpectation struct {
name string
attributes attributeExpectation
events eventExpectation
// For each trace ID that meets this expectation, record it here.
// This way, we can ensure that all spans that should be in the same trace have the same trace ID
metTraceIDs []string
}
func (s *spanExpectation) contains(tid string) bool {
for _, metTID := range s.metTraceIDs {
if tid == metTID {
return true
}
}
return false
met bool
}
// eventExpectation is the expectation for an event attached to a span.
@ -1158,3 +1193,18 @@ func (a attributeExpectation) matches(attrs []*commonv1.KeyValue) bool {
}
return true
}
var r = rand.New(rand.NewSource(time.Now().UnixNano()))
func sampledContext() (context.Context, trace.TraceID) {
tid := trace.TraceID{}
_, _ = r.Read(tid[:])
sid := trace.SpanID{}
_, _ = r.Read(sid[:])
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: tid,
SpanID: sid,
TraceFlags: trace.FlagsSampled,
})
return trace.ContextWithSpanContext(context.Background(), sc), tid
}