diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index ffc65087e8f..c5c7b355d9b 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -25,6 +25,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/spf13/pflag" + oteltrace "go.opentelemetry.io/otel/trace" "k8s.io/apiserver/pkg/admission" apiserveroptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/storage/etcd3" @@ -153,10 +154,11 @@ func TestAddFlags(t *testing.T) { StorageConfig: storagebackend.Config{ Type: "etcd3", Transport: storagebackend.TransportConfig{ - ServerList: nil, - KeyFile: "/var/run/kubernetes/etcd.key", - TrustedCAFile: "/var/run/kubernetes/etcdca.crt", - CertFile: "/var/run/kubernetes/etcdce.crt", + ServerList: nil, + KeyFile: "/var/run/kubernetes/etcd.key", + TrustedCAFile: "/var/run/kubernetes/etcdca.crt", + CertFile: "/var/run/kubernetes/etcdce.crt", + TracerProvider: oteltrace.NewNoopTracerProvider(), }, Paging: true, Prefix: "/registry", diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index aa2995428ba..f05bb831e11 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -31,6 +31,8 @@ import ( "github.com/spf13/cobra" + oteltrace "go.opentelemetry.io/otel/trace" + extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilnet "k8s.io/apimachinery/pkg/util/net" @@ -417,8 +419,10 @@ func buildGenericConfig( if genericConfig.EgressSelector != nil { storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup } - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) && genericConfig.TracerProvider != nil { + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { storageFactory.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider + } else { + storageFactory.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider() } if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil { return diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 0c202f820e4..f1f81730853 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -39,6 +39,10 @@ import ( "k8s.io/mount-utils" cadvisorapi "github.com/google/cadvisor/info/v1" + "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" + otelsdkresource "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/semconv" + oteltrace "go.opentelemetry.io/otel/trace" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" @@ -68,6 +72,7 @@ import ( logsapi "k8s.io/component-base/logs/api/v1" "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" + tracing "k8s.io/component-base/tracing" "k8s.io/component-base/version" "k8s.io/component-base/version/verflag" kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" @@ -373,6 +378,13 @@ func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.Fea if err != nil { return nil, err } + tp := oteltrace.NewNoopTracerProvider() + if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) { + tp, err = newTracerProvider(s) + if err != nil { + return nil, err + } + } return &kubelet.Dependencies{ Auth: nil, // default does not enforce auth[nz] CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here @@ -381,6 +393,7 @@ func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.Fea KubeClient: nil, HeartbeatClient: nil, EventClient: nil, + TracerProvider: tp, HostUtil: hu, Mounter: mounter, Subpather: subpather, @@ -563,7 +576,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend klog.InfoS("Standalone mode, no API client") case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil: - clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, nodeName) + clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, kubeDeps.TracerProvider, nodeName) if err != nil { return err } @@ -790,7 +803,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend // buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether // bootstrapping is enabled or client certificate rotation is enabled. -func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) { +func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, tp oteltrace.TracerProvider, nodeName types.NodeName) (*restclient.Config, func(), error) { if s.RotateCertificates { // Rules for client rotation and the handling of kube config files: // @@ -905,6 +918,9 @@ func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, nod utilnet.CloseIdleConnectionsFor(clientConfig.Transport) } } + if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) { + clientConfig.Wrap(tracing.WrapperFor(tp)) + } return clientConfig, onHeartbeatFailure, nil } @@ -1167,7 +1183,7 @@ func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubele // start the kubelet server if enableServer { - go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth) + go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider) } if kubeCfg.ReadOnlyPort > 0 { go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort)) @@ -1250,3 +1266,24 @@ func parseResourceList(m map[string]string) (v1.ResourceList, error) { } return rl, nil } + +func newTracerProvider(s *options.KubeletServer) (oteltrace.TracerProvider, error) { + if s.KubeletConfiguration.Tracing == nil { + return oteltrace.NewNoopTracerProvider(), nil + } + hostname, err := nodeutil.GetHostname(s.HostnameOverride) + if err != nil { + return nil, fmt.Errorf("could not determine hostname for tracer provider: %v", err) + } + resourceOpts := []otelsdkresource.Option{ + otelsdkresource.WithAttributes( + semconv.ServiceNameKey.String(componentKubelet), + semconv.HostNameKey.String(hostname), + ), + } + tp, err := tracing.NewProvider(context.Background(), s.KubeletConfiguration.Tracing, []otlpgrpc.Option{}, resourceOpts) + if err != nil { + return nil, fmt.Errorf("could not configure tracer provider: %v", err) + } + return tp, nil +} diff --git a/cmd/kubemark/hollow-node.go b/cmd/kubemark/hollow-node.go index eacce3f6a43..56ea44eef64 100644 --- a/cmd/kubemark/hollow-node.go +++ b/cmd/kubemark/hollow-node.go @@ -26,6 +26,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" + oteltrace "go.opentelemetry.io/otel/trace" internalapi "k8s.io/cri-api/pkg/apis" "k8s.io/klog/v2" @@ -246,7 +247,7 @@ func run(cmd *cobra.Command, config *hollowNodeConfig) error { return fmt.Errorf("Failed to start fake runtime, error: %w", err) } defer fakeRemoteRuntime.Stop() - runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second) + runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second, oteltrace.NewNoopTracerProvider()) if err != nil { return fmt.Errorf("Failed to init runtime service, error: %w", err) } diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 972c1946499..0c87cbab928 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -498,6 +498,13 @@ const ( // Enable POD resources API to return allocatable resources KubeletPodResourcesGetAllocatable featuregate.Feature = "KubeletPodResourcesGetAllocatable" + // owner: @sallyom + // kep: http://kep.k8s.io/2832 + // alpha: v1.25 + // + // Add support for distributed tracing in the kubelet + KubeletTracing featuregate.Feature = "KubeletTracing" + // owner: @zshihang // kep: http://kep.k8s.io/2800 // beta: v1.24 @@ -961,6 +968,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS KubeletPodResourcesGetAllocatable: {Default: true, PreRelease: featuregate.Beta}, + KubeletTracing: {Default: false, PreRelease: featuregate.Alpha}, + LegacyServiceAccountTokenNoAutoGeneration: {Default: true, PreRelease: featuregate.Beta}, LocalStorageCapacityIsolation: {Default: true, PreRelease: featuregate.Beta}, diff --git a/pkg/kubeapiserver/admission/config.go b/pkg/kubeapiserver/admission/config.go index 3c9314b5801..6717f839a6f 100644 --- a/pkg/kubeapiserver/admission/config.go +++ b/pkg/kubeapiserver/admission/config.go @@ -47,7 +47,7 @@ type Config struct { } // New sets up the plugins and admission start hooks needed for admission -func (c *Config) New(proxyTransport *http.Transport, egressSelector *egressselector.EgressSelector, serviceResolver webhook.ServiceResolver, tp *trace.TracerProvider) ([]admission.PluginInitializer, genericapiserver.PostStartHookFunc, error) { +func (c *Config) New(proxyTransport *http.Transport, egressSelector *egressselector.EgressSelector, serviceResolver webhook.ServiceResolver, tp trace.TracerProvider) ([]admission.PluginInitializer, genericapiserver.PostStartHookFunc, error) { webhookAuthResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, egressSelector, c.LoopbackClientConfig, tp) webhookPluginInitializer := webhookinit.NewPluginInitializer(webhookAuthResolverWrapper, serviceResolver) diff --git a/pkg/kubelet/apis/config/helpers_test.go b/pkg/kubelet/apis/config/helpers_test.go index f599afc48e7..6934f8193d9 100644 --- a/pkg/kubelet/apis/config/helpers_test.go +++ b/pkg/kubelet/apis/config/helpers_test.go @@ -280,5 +280,7 @@ var ( "ShutdownGracePeriod.Duration", "ShutdownGracePeriodCriticalPods.Duration", "MemoryThrottlingFactor", + "Tracing.Endpoint", + "Tracing.SamplingRatePerMillion", ) ) diff --git a/pkg/kubelet/apis/config/register_test.go b/pkg/kubelet/apis/config/register_test.go index b67fcb0df62..339bafd8fad 100644 --- a/pkg/kubelet/apis/config/register_test.go +++ b/pkg/kubelet/apis/config/register_test.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" componentconfigtesting "k8s.io/component-base/config/testing" logsapi "k8s.io/component-base/logs/api/v1" + tracingapi "k8s.io/component-base/tracing/api/v1" ) func TestComponentConfigSetup(t *testing.T) { @@ -33,11 +34,12 @@ func TestComponentConfigSetup(t *testing.T) { SchemeGroupVersion: SchemeGroupVersion, AddToScheme: AddToScheme, AllowedTags: map[reflect.Type]bool{ - reflect.TypeOf(logsapi.LoggingConfiguration{}): true, - reflect.TypeOf(metav1.Duration{}): true, - reflect.TypeOf(metav1.TypeMeta{}): true, - reflect.TypeOf(v1.NodeConfigSource{}): true, - reflect.TypeOf(v1.Taint{}): true, + reflect.TypeOf(logsapi.LoggingConfiguration{}): true, + reflect.TypeOf(tracingapi.TracingConfiguration{}): true, + reflect.TypeOf(metav1.Duration{}): true, + reflect.TypeOf(metav1.TypeMeta{}): true, + reflect.TypeOf(v1.NodeConfigSource{}): true, + reflect.TypeOf(v1.Taint{}): true, }, } diff --git a/pkg/kubelet/apis/config/types.go b/pkg/kubelet/apis/config/types.go index 923645a5134..bcf25b977ee 100644 --- a/pkg/kubelet/apis/config/types.go +++ b/pkg/kubelet/apis/config/types.go @@ -20,6 +20,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" logsapi "k8s.io/component-base/logs/api/v1" + tracingapi "k8s.io/component-base/tracing/api/v1" ) // HairpinMode denotes how the kubelet should configure networking to handle @@ -441,10 +442,14 @@ type KubeletConfiguration struct { // is true and upon the initial registration of the node. // +optional RegisterWithTaints []v1.Taint - // registerNode enables automatic registration with the apiserver. // +optional RegisterNode bool + // Tracing specifies the versioned configuration for OpenTelemetry tracing clients. + // See http://kep.k8s.io/2832 for more details. + // +featureGate=KubeletTracing + // +optional + Tracing *tracingapi.TracingConfiguration } // KubeletAuthorizationMode denotes the authorization mode for the kubelet diff --git a/pkg/kubelet/apis/config/validation/validation.go b/pkg/kubelet/apis/config/validation/validation.go index 872c912957c..afd43d70b8e 100644 --- a/pkg/kubelet/apis/config/validation/validation.go +++ b/pkg/kubelet/apis/config/validation/validation.go @@ -27,6 +27,7 @@ import ( "k8s.io/component-base/featuregate" logsapi "k8s.io/component-base/logs/api/v1" "k8s.io/component-base/metrics" + tracingapi "k8s.io/component-base/tracing/api/v1" "k8s.io/kubernetes/pkg/features" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" @@ -241,6 +242,14 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration, featur allErrors = append(allErrors, errs.ToAggregate().Errors()...) } + if localFeatureGate.Enabled(features.KubeletTracing) { + if errs := tracingapi.ValidateTracingConfiguration(kc.Tracing, localFeatureGate, field.NewPath("tracing")); len(errs) > 0 { + allErrors = append(allErrors, errs.ToAggregate().Errors()...) + } + } else if kc.Tracing != nil { + allErrors = append(allErrors, fmt.Errorf("invalid configuration: tracing should not be configured if KubeletTracing feature flag is disabled.")) + } + if localFeatureGate.Enabled(features.MemoryQoS) && kc.MemoryThrottlingFactor == nil { allErrors = append(allErrors, fmt.Errorf("invalid configuration: memoryThrottlingFactor is required when MemoryQoS feature flag is enabled")) } diff --git a/pkg/kubelet/apis/config/validation/validation_test.go b/pkg/kubelet/apis/config/validation/validation_test.go index 26c517d1442..a86d16be552 100644 --- a/pkg/kubelet/apis/config/validation/validation_test.go +++ b/pkg/kubelet/apis/config/validation/validation_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" logsapi "k8s.io/component-base/logs/api/v1" + tracingapi "k8s.io/component-base/tracing/api/v1" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/apis/config/validation" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -497,6 +498,36 @@ func TestValidateKubeletConfiguration(t *testing.T) { }, errMsg: "invalid configuration: taint.TimeAdded is not nil", }, + { + name: "specify tracing with KubeletTracing disabled", + configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration { + samplingRate := int32(99999) + conf.FeatureGates = map[string]bool{"KubeletTracing": false} + conf.Tracing = &tracingapi.TracingConfiguration{SamplingRatePerMillion: &samplingRate} + return conf + }, + errMsg: "invalid configuration: tracing should not be configured if KubeletTracing feature flag is disabled.", + }, + { + name: "specify tracing invalid sampling rate", + configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration { + samplingRate := int32(-1) + conf.FeatureGates = map[string]bool{"KubeletTracing": true} + conf.Tracing = &tracingapi.TracingConfiguration{SamplingRatePerMillion: &samplingRate} + return conf + }, + errMsg: "tracing.samplingRatePerMillion: Invalid value: -1: sampling rate must be positive", + }, + { + name: "specify tracing invalid endpoint", + configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration { + ep := "dn%2s://localhost:4317" + conf.FeatureGates = map[string]bool{"KubeletTracing": true} + conf.Tracing = &tracingapi.TracingConfiguration{Endpoint: &ep} + return conf + }, + errMsg: "tracing.endpoint: Invalid value: \"dn%2s://localhost:4317\": parse \"dn%2s://localhost:4317\": first path segment in URL cannot contain colon", + }, } for _, tc := range cases { diff --git a/pkg/kubelet/cri/remote/remote_runtime.go b/pkg/kubelet/cri/remote/remote_runtime.go index aaed9f33805..a0637466795 100644 --- a/pkg/kubelet/cri/remote/remote_runtime.go +++ b/pkg/kubelet/cri/remote/remote_runtime.go @@ -23,16 +23,20 @@ import ( "strings" "time" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" - "k8s.io/klog/v2" - + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/logs/logreduction" + tracing "k8s.io/component-base/tracing" internalapi "k8s.io/cri-api/pkg/apis" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" runtimeapiV1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/probe/exec" utilexec "k8s.io/utils/exec" @@ -68,7 +72,7 @@ const ( ) // NewRemoteRuntimeService creates a new internalapi.RuntimeService. -func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) { +func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.RuntimeService, error) { klog.V(3).InfoS("Connecting to runtime service", "endpoint", endpoint) addr, dialer, err := util.GetAddressAndDialer(endpoint) if err != nil { @@ -77,10 +81,23 @@ func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) ( ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) defer cancel() - conn, err := grpc.DialContext(ctx, addr, + dialOpts := []grpc.DialOption{} + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) + if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) { + tracingOpts := []otelgrpc.Option{ + otelgrpc.WithPropagators(tracing.Propagators()), + otelgrpc.WithTracerProvider(tp), + } + // Even if there is no TracerProvider, the otelgrpc still handles context propagation. + // See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough + dialOpts = append(dialOpts, + grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)), + grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...))) + } + conn, err := grpc.DialContext(ctx, addr, dialOpts...) if err != nil { klog.ErrorS(err, "Connect remote runtime failed", "address", addr) return nil, err diff --git a/pkg/kubelet/cri/remote/remote_runtime_test.go b/pkg/kubelet/cri/remote/remote_runtime_test.go index 7cdb00ba780..1407a4bc9b9 100644 --- a/pkg/kubelet/cri/remote/remote_runtime_test.go +++ b/pkg/kubelet/cri/remote/remote_runtime_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + oteltrace "go.opentelemetry.io/otel/trace" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" internalapi "k8s.io/cri-api/pkg/apis" @@ -47,7 +49,7 @@ func createAndStartFakeRemoteRuntime(t *testing.T) (*fakeremote.RemoteRuntime, s } func createRemoteRuntimeService(endpoint string, t *testing.T) internalapi.RuntimeService { - runtimeService, err := NewRemoteRuntimeService(endpoint, defaultConnectionTimeout) + runtimeService, err := NewRemoteRuntimeService(endpoint, defaultConnectionTimeout, oteltrace.NewNoopTracerProvider()) require.NoError(t, err) return runtimeService diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 24310cc3c3a..bc784890ae2 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -38,6 +38,7 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" libcontaineruserns "github.com/opencontainers/runc/libcontainer/userns" + "go.opentelemetry.io/otel/trace" "k8s.io/mount-utils" "k8s.io/utils/integer" netutils "k8s.io/utils/net" @@ -206,7 +207,7 @@ type Bootstrap interface { GetConfiguration() kubeletconfiginternal.KubeletConfiguration BirthCry() StartGarbageCollection() - ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface) + ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider) ListenAndServeReadOnly(address net.IP, port uint) ListenAndServePodResources() Run(<-chan kubetypes.PodUpdate) @@ -236,6 +237,7 @@ type Dependencies struct { ProbeManager prober.Manager Recorder record.EventRecorder Subpather subpath.Interface + TracerProvider trace.TracerProvider VolumePlugins []volume.VolumePlugin DynamicPluginProber volume.DynamicPluginProber TLSOptions *server.TLSOptions @@ -293,7 +295,7 @@ func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration, } var err error - if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil { + if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil { return err } if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil { @@ -2395,8 +2397,8 @@ func (kl *Kubelet) ResyncInterval() time.Duration { // ListenAndServe runs the kubelet HTTP server. func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, - auth server.AuthInterface) { - server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth) + auth server.AuthInterface, tp trace.TracerProvider) { + server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth, tp) } // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode. diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 3f33c8885d1..a7f4ca9ebbf 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -37,6 +37,7 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorv2 "github.com/google/cadvisor/info/v2" "github.com/google/cadvisor/metrics" + oteltrace "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/metrics/collectors" @@ -63,6 +64,7 @@ import ( compbasemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + tracing "k8s.io/component-base/tracing" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" podresourcesapiv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1" "k8s.io/kubernetes/pkg/api/legacyscheme" @@ -127,10 +129,13 @@ type containerInterface interface { // so we can ensure restful.FilterFunctions are used for all handlers type filteringContainer struct { *restful.Container + oteltrace.TracerProvider + registeredHandlePaths []string } func (a *filteringContainer) Handle(path string, handler http.Handler) { + handler = tracing.WithTracing(handler, a.TracerProvider, "kubelet") a.HandleWithFilter(path, handler) a.registeredHandlePaths = append(a.registeredHandlePaths, path) } @@ -144,12 +149,13 @@ func ListenAndServeKubeletServer( resourceAnalyzer stats.ResourceAnalyzer, kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *TLSOptions, - auth AuthInterface) { + auth AuthInterface, + tp oteltrace.TracerProvider) { address := netutils.ParseIPSloppy(kubeCfg.Address) port := uint(kubeCfg.Port) klog.InfoS("Starting to listen", "address", address, "port", port) - handler := NewServer(host, resourceAnalyzer, auth, kubeCfg) + handler := NewServer(host, resourceAnalyzer, auth, tp, kubeCfg) s := &http.Server{ Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Handler: &handler, @@ -158,6 +164,7 @@ func ListenAndServeKubeletServer( WriteTimeout: 4 * 60 * time.Minute, MaxHeaderBytes: 1 << 20, } + if tlsOptions != nil { s.TLSConfig = tlsOptions.Config // Passing empty strings as the cert and key files means no @@ -174,9 +181,14 @@ func ListenAndServeKubeletServer( } // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet. -func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint) { +func ListenAndServeKubeletReadOnlyServer( + host HostInterface, + resourceAnalyzer stats.ResourceAnalyzer, + address net.IP, + port uint) { klog.InfoS("Starting to listen read-only", "address", address, "port", port) - s := NewServer(host, resourceAnalyzer, nil, nil) + // TODO: https://github.com/kubernetes/kubernetes/issues/109829 tracer should use WithPublicEndpoint + s := NewServer(host, resourceAnalyzer, nil, oteltrace.NewNoopTracerProvider(), nil) server := &http.Server{ Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), @@ -241,12 +253,15 @@ func NewServer( host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, auth AuthInterface, + tp oteltrace.TracerProvider, kubeCfg *kubeletconfiginternal.KubeletConfiguration) Server { + rc := &filteringContainer{Container: restful.NewContainer(), TracerProvider: tp} + server := Server{ host: host, resourceAnalyzer: resourceAnalyzer, auth: auth, - restfulCont: &filteringContainer{Container: restful.NewContainer()}, + restfulCont: rc, metricsBuckets: sets.NewString(), metricsMethodBuckets: sets.NewString("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"), } diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 521716aa59c..734cfe23d5d 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -37,6 +37,7 @@ import ( cadvisorapiv2 "github.com/google/cadvisor/info/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + oteltrace "go.opentelemetry.io/otel/trace" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -358,6 +359,7 @@ func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletCo fw.fakeKubelet, stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}), fw.fakeAuth, + oteltrace.NewNoopTracerProvider(), kubeCfg, ) fw.serverUnderTest = &server diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go index b81ece7775a..97a662be321 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options/options.go @@ -23,6 +23,7 @@ import ( "net/url" "github.com/spf13/pflag" + oteltrace "go.opentelemetry.io/otel/trace" v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" @@ -111,7 +112,7 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err ExtraConfig: apiserver.ExtraConfig{ CRDRESTOptionsGetter: NewCRDRESTOptionsGetter(*o.RecommendedOptions.Etcd), ServiceResolver: &serviceResolver{serverConfig.SharedInformerFactory.Core().V1().Services().Lister()}, - AuthResolverWrapper: webhook.NewDefaultAuthenticationInfoResolverWrapper(nil, nil, serverConfig.LoopbackClientConfig, nil), + AuthResolverWrapper: webhook.NewDefaultAuthenticationInfoResolverWrapper(nil, nil, serverConfig.LoopbackClientConfig, oteltrace.NewNoopTracerProvider()), }, } return config, nil diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/traces.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/traces.go index a3339773e9f..67a1790c56a 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/traces.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/traces.go @@ -22,19 +22,17 @@ import ( "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/trace" - "k8s.io/component-base/traces" + tracing "k8s.io/component-base/tracing" ) // WithTracing adds tracing to requests if the incoming request is sampled -func WithTracing(handler http.Handler, tp *trace.TracerProvider) http.Handler { +func WithTracing(handler http.Handler, tp trace.TracerProvider) http.Handler { opts := []otelhttp.Option{ - otelhttp.WithPropagators(traces.Propagators()), + otelhttp.WithPropagators(tracing.Propagators()), otelhttp.WithPublicEndpoint(), + otelhttp.WithTracerProvider(tp), } - if tp != nil { - opts = append(opts, otelhttp.WithTracerProvider(*tp)) - } - // Even if there is no TracerProvider, the otelhttp still handles context propagation. + // With Noop TracerProvider, the otelhttp still handles context propagation. // See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough return otelhttp.NewHandler(handler, "KubernetesAPI", opts...) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index ddff43e5370..d21ea2ef000 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -30,7 +30,7 @@ import ( jsonpatch "github.com/evanphx/json-patch" "github.com/google/uuid" - "go.opentelemetry.io/otel/trace" + oteltrace "go.opentelemetry.io/otel/trace" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -139,7 +139,7 @@ type Config struct { ExternalAddress string // TracerProvider can provide a tracer, which records spans for distributed tracing. - TracerProvider *trace.TracerProvider + TracerProvider oteltrace.TracerProvider //=========================================================================== // Fields you probably don't care about changing @@ -372,6 +372,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { APIServerID: id, StorageVersionManager: storageversion.NewDefaultManager(), + TracerProvider: oteltrace.NewNoopTracerProvider(), } } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/coreapi.go b/staging/src/k8s.io/apiserver/pkg/server/options/coreapi.go index c1293980f7d..12a65517e1f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/coreapi.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/coreapi.go @@ -28,7 +28,7 @@ import ( clientgoclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/component-base/traces" + tracing "k8s.io/component-base/tracing" ) // CoreAPIOptions contains options to configure the connection to a core API Kubernetes apiserver. @@ -73,7 +73,7 @@ func (o *CoreAPIOptions) ApplyTo(config *server.RecommendedConfig) error { } } if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) { - kubeconfig.Wrap(traces.WrapperFor(config.TracerProvider)) + kubeconfig.Wrap(tracing.WrapperFor(config.TracerProvider)) } clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeconfig) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go index 0d9c6ca9f40..4c8e1e2a3e7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/recommended.go @@ -107,10 +107,8 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error { if err := o.EgressSelector.ApplyTo(&config.Config); err != nil { return err } - if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) { - if err := o.Traces.ApplyTo(config.Config.EgressSelector, &config.Config); err != nil { - return err - } + if err := o.Traces.ApplyTo(config.Config.EgressSelector, &config.Config); err != nil { + return err } if err := o.SecureServing.ApplyTo(&config.Config.SecureServing, &config.Config.LoopbackClientConfig); err != nil { return err diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/tracing.go b/staging/src/k8s.io/apiserver/pkg/server/options/tracing.go index bea7f363a93..379691a0803 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/tracing.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/tracing.go @@ -19,24 +19,39 @@ package options import ( "context" "fmt" + "io/ioutil" "net" "github.com/spf13/pflag" "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/semconv" "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apiserver/pkg/apis/apiserver" + "k8s.io/apiserver/pkg/apis/apiserver/install" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/egressselector" - "k8s.io/apiserver/pkg/tracing" - "k8s.io/component-base/traces" + "k8s.io/apiserver/pkg/util/feature" + tracing "k8s.io/component-base/tracing" + tracingapi "k8s.io/component-base/tracing/api/v1" "k8s.io/utils/path" ) const apiserverService = "apiserver" +var ( + cfgScheme = runtime.NewScheme() + codecs = serializer.NewCodecFactory(cfgScheme) +) + +func init() { + install.Install(cfgScheme) +} + // TracingOptions contain configuration options for tracing // exporters type TracingOptions struct { @@ -64,21 +79,21 @@ func (o *TracingOptions) ApplyTo(es *egressselector.EgressSelector, c *server.Co if o == nil || o.ConfigFile == "" { return nil } + if !feature.DefaultFeatureGate.Enabled(features.APIServerTracing) { + return fmt.Errorf("APIServerTracing feature is not enabled, but tracing config file was provided") + } - npConfig, err := tracing.ReadTracingConfiguration(o.ConfigFile) + traceConfig, err := ReadTracingConfiguration(o.ConfigFile) if err != nil { return fmt.Errorf("failed to read tracing config: %v", err) } - errs := tracing.ValidateTracingConfiguration(npConfig) + errs := tracingapi.ValidateTracingConfiguration(traceConfig, feature.DefaultFeatureGate, nil) if len(errs) > 0 { return fmt.Errorf("failed to validate tracing configuration: %v", errs.ToAggregate()) } opts := []otlpgrpc.Option{} - if npConfig.Endpoint != nil { - opts = append(opts, otlpgrpc.WithEndpoint(*npConfig.Endpoint)) - } if es != nil { // Only use the egressselector dialer if egressselector is enabled. // Endpoint is on the "ControlPlane" network @@ -93,21 +108,19 @@ func (o *TracingOptions) ApplyTo(es *egressselector.EgressSelector, c *server.Co opts = append(opts, otlpgrpc.WithDialOption(grpc.WithContextDialer(otelDialer))) } - sampler := sdktrace.NeverSample() - if npConfig.SamplingRatePerMillion != nil && *npConfig.SamplingRatePerMillion > 0 { - sampler = sdktrace.TraceIDRatioBased(float64(*npConfig.SamplingRatePerMillion) / float64(1000000)) - } - resourceOpts := []resource.Option{ resource.WithAttributes( semconv.ServiceNameKey.String(apiserverService), semconv.ServiceInstanceIDKey.String(c.APIServerID), ), } - tp := traces.NewProvider(context.Background(), sampler, resourceOpts, opts...) - c.TracerProvider = &tp + tp, err := tracing.NewProvider(context.Background(), traceConfig, opts, resourceOpts) + if err != nil { + return err + } + c.TracerProvider = tp if c.LoopbackClientConfig != nil { - c.LoopbackClientConfig.Wrap(traces.WrapperFor(c.TracerProvider)) + c.LoopbackClientConfig.Wrap(tracing.WrapperFor(c.TracerProvider)) } return nil } @@ -125,3 +138,24 @@ func (o *TracingOptions) Validate() (errs []error) { } return } + +// ReadTracingConfiguration reads the tracing configuration from a file +func ReadTracingConfiguration(configFilePath string) (*tracingapi.TracingConfiguration, error) { + if configFilePath == "" { + return nil, fmt.Errorf("tracing config file was empty") + } + data, err := ioutil.ReadFile(configFilePath) + if err != nil { + return nil, fmt.Errorf("unable to read tracing configuration from %q: %v", configFilePath, err) + } + internalConfig := &apiserver.TracingConfiguration{} + // this handles json/yaml/whatever, and decodes all registered version to the internal version + if err := runtime.DecodeInto(codecs.UniversalDecoder(), data, internalConfig); err != nil { + return nil, fmt.Errorf("unable to decode tracing configuration data: %v", err) + } + tc := &tracingapi.TracingConfiguration{ + Endpoint: internalConfig.Endpoint, + SamplingRatePerMillion: internalConfig.SamplingRatePerMillion, + } + return tc, nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/tracing_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/tracing_test.go index a1090ce973b..391fadb7b5c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/tracing_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/tracing_test.go @@ -16,7 +16,26 @@ limitations under the License. package options -import "testing" +import ( + "fmt" + "io/ioutil" + "os" + "reflect" + "strings" + "testing" + + tracingapi "k8s.io/component-base/tracing/api/v1" +) + +var ( + localhost = "localhost:4317" + ipAddress = "127.0.0.1:4317" + samplingRate = int32(12345) +) + +func strptr(s string) *string { + return &s +} func TestValidateTracingOptions(t *testing.T) { testcases := []struct { @@ -56,3 +75,110 @@ func TestValidateTracingOptions(t *testing.T) { }) } } + +func TestReadTracingConfiguration(t *testing.T) { + testcases := []struct { + name string + contents string + createFile bool + expectedResult *tracingapi.TracingConfiguration + expectedError *string + }{ + { + name: "empty", + createFile: true, + contents: ``, + expectedResult: &tracingapi.TracingConfiguration{}, + expectedError: nil, + }, + { + name: "absent", + createFile: false, + contents: ``, + expectedResult: nil, + expectedError: strptr("unable to read tracing configuration from \"test-tracing-config-absent\": open test-tracing-config-absent: no such file or directory"), + }, + { + name: "v1alpha1", + createFile: true, + contents: ` +apiVersion: apiserver.config.k8s.io/v1alpha1 +kind: TracingConfiguration +endpoint: localhost:4317 +samplingRatePerMillion: 12345 +`, + expectedResult: &tracingapi.TracingConfiguration{ + Endpoint: &localhost, + SamplingRatePerMillion: &samplingRate, + }, + expectedError: nil, + }, + { + name: "ip address", + createFile: true, + contents: ` +apiVersion: apiserver.config.k8s.io/v1alpha1 +kind: TracingConfiguration +endpoint: 127.0.0.1:4317 +`, + expectedResult: &tracingapi.TracingConfiguration{ + Endpoint: &ipAddress, + }, + expectedError: nil, + }, + { + name: "wrong_type", + createFile: true, + contents: ` +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: agent +spec: + selector: + matchLabels: + k8s-app: agent + template: + metadata: + labels: + k8s-app: agent + spec: + containers: + - image: k8s.gcr.io/busybox + name: agent +`, + expectedResult: nil, + expectedError: strptr("unable to decode tracing configuration data: no kind \"DaemonSet\" is registered for version \"apps/v1\" in scheme"), + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + proxyConfig := fmt.Sprintf("test-tracing-config-%s", tc.name) + if tc.createFile { + f, err := ioutil.TempFile("", proxyConfig) + if err != nil { + t.Fatal(err) + } + defer os.Remove(f.Name()) + if err := ioutil.WriteFile(f.Name(), []byte(tc.contents), os.FileMode(0755)); err != nil { + t.Fatal(err) + } + proxyConfig = f.Name() + } + config, err := ReadTracingConfiguration(proxyConfig) + if err == nil && tc.expectedError != nil { + t.Errorf("calling ReadTracingConfiguration expected error: %s, did not get it", *tc.expectedError) + } + if err != nil && tc.expectedError == nil { + t.Errorf("unexpected error calling ReadTracingConfiguration got: %#v", err) + } + if err != nil && tc.expectedError != nil && !strings.HasPrefix(err.Error(), *tc.expectedError) { + t.Errorf("calling ReadTracingConfiguration expected error: %s, got %#v", *tc.expectedError, err) + } + if !reflect.DeepEqual(config, tc.expectedResult) { + t.Errorf("problem with configuration returned from ReadTracingConfiguration expected: %#v, got: %#v", tc.expectedResult, config) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go index 3dc7d0c0082..47534c97818 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go @@ -19,7 +19,8 @@ package storagebackend import ( "time" - "go.opentelemetry.io/otel/trace" + oteltrace "go.opentelemetry.io/otel/trace" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/server/egressselector" @@ -50,7 +51,7 @@ type TransportConfig struct { // function to determine the egress dialer. (i.e. konnectivity server dialer) EgressLookup egressselector.Lookup // The TracerProvider can add tracing the connection - TracerProvider *trace.TracerProvider + TracerProvider oteltrace.TracerProvider } // Config is configuration for creating a storage backend. @@ -122,5 +123,6 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { HealthcheckTimeout: DefaultHealthcheckTimeout, ReadycheckTimeout: DefaultReadinessTimeout, LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(), + Transport: TransportConfig{TracerProvider: oteltrace.NewNoopTracerProvider()}, } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 07b61d357a0..ef75c81ce7f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -45,7 +45,7 @@ import ( "k8s.io/apiserver/pkg/storage/value" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/metrics/legacyregistry" - "k8s.io/component-base/traces" + tracing "k8s.io/component-base/tracing" "k8s.io/klog/v2" ) @@ -191,12 +191,10 @@ var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, e } if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { tracingOpts := []otelgrpc.Option{ - otelgrpc.WithPropagators(traces.Propagators()), + otelgrpc.WithPropagators(tracing.Propagators()), + otelgrpc.WithTracerProvider(c.TracerProvider), } - if c.TracerProvider != nil { - tracingOpts = append(tracingOpts, otelgrpc.WithTracerProvider(*c.TracerProvider)) - } - // Even if there is no TracerProvider, the otelgrpc still handles context propagation. + // Even with Noop TracerProvider, the otelgrpc still handles context propagation. // See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)), diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go index 86d9cd1e9f9..8d21d23383e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go @@ -25,6 +25,7 @@ import ( "testing" "go.etcd.io/etcd/client/pkg/v3/transport" + oteltrace "go.opentelemetry.io/otel/trace" apitesting "k8s.io/apimachinery/pkg/api/apitesting" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -72,10 +73,11 @@ func TestTLSConnection(t *testing.T) { cfg := storagebackend.Config{ Type: storagebackend.StorageTypeETCD3, Transport: storagebackend.TransportConfig{ - ServerList: client.Endpoints(), - CertFile: certFile, - KeyFile: keyFile, - TrustedCAFile: caFile, + ServerList: client.Endpoints(), + CertFile: certFile, + KeyFile: keyFile, + TrustedCAFile: caFile, + TracerProvider: oteltrace.NewNoopTracerProvider(), }, Codec: codec, } diff --git a/staging/src/k8s.io/apiserver/pkg/tracing/config_test.go b/staging/src/k8s.io/apiserver/pkg/tracing/config_test.go deleted file mode 100644 index ec8171cb4c4..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/tracing/config_test.go +++ /dev/null @@ -1,270 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package tracing - -import ( - "fmt" - "io/ioutil" - "os" - "reflect" - "strings" - "testing" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apiserver/pkg/apis/apiserver" -) - -var ( - localhost = "localhost:4317" - ipAddress = "127.0.0.1:4317" - samplingRate = int32(12345) -) - -func strptr(s string) *string { - return &s -} - -func TestReadTracingConfiguration(t *testing.T) { - testcases := []struct { - name string - contents string - createFile bool - expectedResult *apiserver.TracingConfiguration - expectedError *string - }{ - { - name: "empty", - createFile: true, - contents: ``, - expectedResult: &apiserver.TracingConfiguration{}, - expectedError: nil, - }, - { - name: "absent", - createFile: false, - contents: ``, - expectedResult: nil, - expectedError: strptr("unable to read tracing configuration from \"test-tracing-config-absent\": open test-tracing-config-absent: no such file or directory"), - }, - { - name: "v1alpha1", - createFile: true, - contents: ` -apiVersion: apiserver.config.k8s.io/v1alpha1 -kind: TracingConfiguration -endpoint: localhost:4317 -samplingRatePerMillion: 12345 -`, - expectedResult: &apiserver.TracingConfiguration{ - TypeMeta: metav1.TypeMeta{ - Kind: "", - APIVersion: "", - }, - Endpoint: &localhost, - SamplingRatePerMillion: &samplingRate, - }, - expectedError: nil, - }, - { - name: "ip address", - createFile: true, - contents: ` -apiVersion: apiserver.config.k8s.io/v1alpha1 -kind: TracingConfiguration -endpoint: 127.0.0.1:4317 -`, - expectedResult: &apiserver.TracingConfiguration{ - TypeMeta: metav1.TypeMeta{ - Kind: "", - APIVersion: "", - }, - Endpoint: &ipAddress, - }, - expectedError: nil, - }, - { - name: "wrong_type", - createFile: true, - contents: ` -apiVersion: apps/v1 -kind: DaemonSet -metadata: - name: agent -spec: - selector: - matchLabels: - k8s-app: agent - template: - metadata: - labels: - k8s-app: agent - spec: - containers: - - image: registry.k8s.io/busybox - name: agent -`, - expectedResult: nil, - expectedError: strptr("unable to decode tracing configuration data: no kind \"DaemonSet\" is registered for version \"apps/v1\" in scheme"), - }, - } - - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - proxyConfig := fmt.Sprintf("test-tracing-config-%s", tc.name) - if tc.createFile { - f, err := ioutil.TempFile("", proxyConfig) - if err != nil { - t.Fatal(err) - } - defer os.Remove(f.Name()) - if err := ioutil.WriteFile(f.Name(), []byte(tc.contents), os.FileMode(0755)); err != nil { - t.Fatal(err) - } - proxyConfig = f.Name() - } - config, err := ReadTracingConfiguration(proxyConfig) - if err == nil && tc.expectedError != nil { - t.Errorf("calling ReadTracingConfiguration expected error: %s, did not get it", *tc.expectedError) - } - if err != nil && tc.expectedError == nil { - t.Errorf("unexpected error calling ReadTracingConfiguration got: %#v", err) - } - if err != nil && tc.expectedError != nil && !strings.HasPrefix(err.Error(), *tc.expectedError) { - t.Errorf("calling ReadTracingConfiguration expected error: %s, got %#v", *tc.expectedError, err) - } - if !reflect.DeepEqual(config, tc.expectedResult) { - t.Errorf("problem with configuration returned from ReadTracingConfiguration expected: %#v, got: %#v", tc.expectedResult, config) - } - }) - } -} - -func TestValidateTracingConfiguration(t *testing.T) { - samplingRate := int32(12378) - negativeRate := int32(-1) - tooHighRate := int32(1000001) - validEndpoint := "localhost:4317" - dnsEndpoint := "dns://google.com:4317" - unixEndpoint := "unix://path/to/socket" - invalidURL := "dn%2s://localhost:4317" - httpEndpoint := "http://localhost:4317" - testcases := []struct { - name string - expectError bool - contents *apiserver.TracingConfiguration - }{ - { - name: "sampling-rate-valid", - expectError: false, - contents: &apiserver.TracingConfiguration{ - TypeMeta: metav1.TypeMeta{ - Kind: "", - APIVersion: "", - }, - SamplingRatePerMillion: &samplingRate, - }, - }, - { - name: "sampling-rate-negative", - expectError: true, - contents: &apiserver.TracingConfiguration{ - TypeMeta: metav1.TypeMeta{ - Kind: "", - APIVersion: "", - }, - SamplingRatePerMillion: &negativeRate, - }, - }, - { - name: "sampling-rate-negative", - expectError: true, - contents: &apiserver.TracingConfiguration{ - TypeMeta: metav1.TypeMeta{ - Kind: "", - APIVersion: "", - }, - SamplingRatePerMillion: &tooHighRate, - }, - }, - { - name: "default Endpoint", - expectError: false, - contents: &apiserver.TracingConfiguration{ - TypeMeta: metav1.TypeMeta{ - Kind: "", - APIVersion: "", - }, - Endpoint: &validEndpoint, - }, - }, - { - name: "dns Endpoint", - expectError: false, - contents: &apiserver.TracingConfiguration{ - TypeMeta: metav1.TypeMeta{ - Kind: "", - APIVersion: "", - }, - Endpoint: &dnsEndpoint, - }, - }, - { - name: "unix Endpoint", - expectError: false, - contents: &apiserver.TracingConfiguration{ - TypeMeta: metav1.TypeMeta{ - Kind: "", - APIVersion: "", - }, - Endpoint: &unixEndpoint, - }, - }, - { - name: "invalid Endpoint", - expectError: true, - contents: &apiserver.TracingConfiguration{ - TypeMeta: metav1.TypeMeta{ - Kind: "", - APIVersion: "", - }, - Endpoint: &httpEndpoint, - }, - }, - { - name: "invalid url", - expectError: true, - contents: &apiserver.TracingConfiguration{ - TypeMeta: metav1.TypeMeta{ - Kind: "", - APIVersion: "", - }, - Endpoint: &invalidURL, - }, - }, - } - - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - errs := ValidateTracingConfiguration(tc.contents) - if tc.expectError == false && len(errs) != 0 { - t.Errorf("Calling ValidateTracingConfiguration expected no error, got %v", errs) - } else if tc.expectError == true && len(errs) == 0 { - t.Errorf("Calling ValidateTracingConfiguration expected error, got no error") - } - }) - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/webhook/authentication.go b/staging/src/k8s.io/apiserver/pkg/util/webhook/authentication.go index d61e46aaf76..a69506de690 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/webhook/authentication.go +++ b/staging/src/k8s.io/apiserver/pkg/util/webhook/authentication.go @@ -35,7 +35,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" - "k8s.io/component-base/traces" + tracing "k8s.io/component-base/tracing" ) // AuthenticationInfoResolverWrapper can be used to inject Dial function to the @@ -47,7 +47,7 @@ func NewDefaultAuthenticationInfoResolverWrapper( proxyTransport *http.Transport, egressSelector *egressselector.EgressSelector, kubeapiserverClientConfig *rest.Config, - tp *trace.TracerProvider) AuthenticationInfoResolverWrapper { + tp trace.TracerProvider) AuthenticationInfoResolverWrapper { webhookAuthResolverWrapper := func(delegate AuthenticationInfoResolver) AuthenticationInfoResolver { return &AuthenticationInfoResolverDelegator{ @@ -60,7 +60,7 @@ func NewDefaultAuthenticationInfoResolverWrapper( return nil, err } if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) { - ret.Wrap(traces.WrapperFor(tp)) + ret.Wrap(tracing.WrapperFor(tp)) } if egressSelector != nil { @@ -85,7 +85,7 @@ func NewDefaultAuthenticationInfoResolverWrapper( return nil, err } if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) { - ret.Wrap(traces.WrapperFor(tp)) + ret.Wrap(tracing.WrapperFor(tp)) } if egressSelector != nil { diff --git a/staging/src/k8s.io/component-base/traces/utils.go b/staging/src/k8s.io/component-base/traces/utils.go deleted file mode 100644 index fafbb631e1a..00000000000 --- a/staging/src/k8s.io/component-base/traces/utils.go +++ /dev/null @@ -1,81 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package traces - -import ( - "context" - "net/http" - - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" - "go.opentelemetry.io/otel/exporters/otlp" - "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/trace" - - "k8s.io/client-go/transport" - "k8s.io/klog/v2" -) - -// NewProvider initializes tracing in the component, and enforces recommended tracing behavior. -func NewProvider(ctx context.Context, baseSampler sdktrace.Sampler, resourceOpts []resource.Option, opts ...otlpgrpc.Option) trace.TracerProvider { - opts = append(opts, otlpgrpc.WithInsecure()) - driver := otlpgrpc.NewDriver(opts...) - exporter, err := otlp.NewExporter(ctx, driver) - if err != nil { - klog.Fatalf("Failed to create OTLP exporter: %v", err) - } - - res, err := resource.New(ctx, resourceOpts...) - if err != nil { - klog.Fatalf("Failed to create resource: %v", err) - } - - bsp := sdktrace.NewBatchSpanProcessor(exporter) - - return sdktrace.NewTracerProvider( - sdktrace.WithSampler(sdktrace.ParentBased(baseSampler)), - sdktrace.WithSpanProcessor(bsp), - sdktrace.WithResource(res), - ) -} - -// WrapperFor can be used to add tracing to a *rest.Config. Example usage: -// -// tp := traces.NewProvider(...) -// config, _ := rest.InClusterConfig() -// config.Wrap(traces.WrapperFor(&tp)) -// kubeclient, _ := clientset.NewForConfig(config) -func WrapperFor(tp *trace.TracerProvider) transport.WrapperFunc { - return func(rt http.RoundTripper) http.RoundTripper { - opts := []otelhttp.Option{ - otelhttp.WithPropagators(Propagators()), - } - if tp != nil { - opts = append(opts, otelhttp.WithTracerProvider(*tp)) - } - // Even if there is no TracerProvider, the otelhttp still handles context propagation. - // See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough - return otelhttp.NewTransport(rt, opts...) - } -} - -// Propagators returns the recommended set of propagators. -func Propagators() propagation.TextMapPropagator { - return propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}) -} diff --git a/staging/src/k8s.io/component-base/traces/OWNERS b/staging/src/k8s.io/component-base/tracing/OWNERS similarity index 100% rename from staging/src/k8s.io/component-base/traces/OWNERS rename to staging/src/k8s.io/component-base/tracing/OWNERS diff --git a/staging/src/k8s.io/component-base/tracing/api/OWNERS b/staging/src/k8s.io/component-base/tracing/api/OWNERS new file mode 100644 index 00000000000..b26e7a4dc7e --- /dev/null +++ b/staging/src/k8s.io/component-base/tracing/api/OWNERS @@ -0,0 +1,8 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: + - sig-instrumentation-approvers +reviewers: + - sig-instrumentation-reviewers +labels: + - sig/instrumentation diff --git a/staging/src/k8s.io/apiserver/pkg/tracing/config.go b/staging/src/k8s.io/component-base/tracing/api/v1/config.go similarity index 54% rename from staging/src/k8s.io/apiserver/pkg/tracing/config.go rename to staging/src/k8s.io/component-base/tracing/api/v1/config.go index 33ec6ae1087..ae9bbbfc092 100644 --- a/staging/src/k8s.io/apiserver/pkg/tracing/config.go +++ b/staging/src/k8s.io/component-base/tracing/api/v1/config.go @@ -14,63 +14,32 @@ See the License for the specific language governing permissions and limitations under the License. */ -package tracing +package v1 import ( "fmt" - "io/ioutil" "net/url" "strings" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/validation/field" - "k8s.io/apiserver/pkg/apis/apiserver" - "k8s.io/apiserver/pkg/apis/apiserver/install" -) - -const ( - maxSamplingRatePerMillion = 1000000 + "k8s.io/component-base/featuregate" ) var ( - cfgScheme = runtime.NewScheme() - codecs = serializer.NewCodecFactory(cfgScheme) + maxSamplingRatePerMillion = int32(1000000) ) -func init() { - install.Install(cfgScheme) -} - -// ReadTracingConfiguration reads the tracing configuration from a file -func ReadTracingConfiguration(configFilePath string) (*apiserver.TracingConfiguration, error) { - if configFilePath == "" { - return nil, fmt.Errorf("tracing config file was empty") - } - data, err := ioutil.ReadFile(configFilePath) - if err != nil { - return nil, fmt.Errorf("unable to read tracing configuration from %q: %v", configFilePath, err) - } - internalConfig := &apiserver.TracingConfiguration{} - // this handles json/yaml/whatever, and decodes all registered version to the internal version - if err := runtime.DecodeInto(codecs.UniversalDecoder(), data, internalConfig); err != nil { - return nil, fmt.Errorf("unable to decode tracing configuration data: %v", err) - } - return internalConfig, nil -} - // ValidateTracingConfiguration validates the tracing configuration -func ValidateTracingConfiguration(config *apiserver.TracingConfiguration) field.ErrorList { +func ValidateTracingConfiguration(traceConfig *TracingConfiguration, featureGate featuregate.FeatureGate, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} - if config == nil { - // Tracing is disabled + if traceConfig == nil { return allErrs } - if config.SamplingRatePerMillion != nil { - allErrs = append(allErrs, validateSamplingRate(*config.SamplingRatePerMillion, field.NewPath("samplingRatePerMillion"))...) + if traceConfig.SamplingRatePerMillion != nil { + allErrs = append(allErrs, validateSamplingRate(*traceConfig.SamplingRatePerMillion, fldPath.Child("samplingRatePerMillion"))...) } - if config.Endpoint != nil { - allErrs = append(allErrs, validateEndpoint(*config.Endpoint, field.NewPath("endpoint"))...) + if traceConfig.Endpoint != nil { + allErrs = append(allErrs, validateEndpoint(*traceConfig.Endpoint, fldPath.Child("endpoint"))...) } return allErrs } diff --git a/staging/src/k8s.io/component-base/tracing/api/v1/config_test.go b/staging/src/k8s.io/component-base/tracing/api/v1/config_test.go new file mode 100644 index 00000000000..1aa4c0664ea --- /dev/null +++ b/staging/src/k8s.io/component-base/tracing/api/v1/config_test.go @@ -0,0 +1,107 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "testing" + + "k8s.io/apimachinery/pkg/util/validation/field" +) + +func TestValidateTracingConfiguration(t *testing.T) { + samplingRate := int32(12378) + negativeRate := int32(-1) + tooHighRate := int32(1000001) + validEndpoint := "localhost:4317" + dnsEndpoint := "dns://google.com:4317" + unixEndpoint := "unix://path/to/socket" + invalidURL := "dn%2s://localhost:4317" + httpEndpoint := "http://localhost:4317" + testcases := []struct { + name string + expectError bool + contents *TracingConfiguration + }{ + { + name: "sampling-rate-valid", + expectError: false, + contents: &TracingConfiguration{ + SamplingRatePerMillion: &samplingRate, + }, + }, + { + name: "sampling-rate-negative", + expectError: true, + contents: &TracingConfiguration{ + SamplingRatePerMillion: &negativeRate, + }, + }, + { + name: "sampling-rate-negative", + expectError: true, + contents: &TracingConfiguration{ + SamplingRatePerMillion: &tooHighRate, + }, + }, + { + name: "default Endpoint", + expectError: false, + contents: &TracingConfiguration{ + Endpoint: &validEndpoint, + }, + }, + { + name: "dns Endpoint", + expectError: false, + contents: &TracingConfiguration{ + Endpoint: &dnsEndpoint, + }, + }, + { + name: "unix Endpoint", + expectError: false, + contents: &TracingConfiguration{ + Endpoint: &unixEndpoint, + }, + }, + { + name: "invalid Endpoint", + expectError: true, + contents: &TracingConfiguration{ + Endpoint: &httpEndpoint, + }, + }, + { + name: "invalid url", + expectError: true, + contents: &TracingConfiguration{ + Endpoint: &invalidURL, + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + errs := ValidateTracingConfiguration(tc.contents, nil, field.NewPath("tracing")) + if tc.expectError == false && len(errs) != 0 { + t.Errorf("Calling ValidateTracingConfiguration expected no error, got %v", errs) + } else if tc.expectError == true && len(errs) == 0 { + t.Errorf("Calling ValidateTracingConfiguration expected error, got no error") + } + }) + } +} diff --git a/staging/src/k8s.io/component-base/tracing/api/v1/doc.go b/staging/src/k8s.io/component-base/tracing/api/v1/doc.go new file mode 100644 index 00000000000..ffe36aef6f0 --- /dev/null +++ b/staging/src/k8s.io/component-base/tracing/api/v1/doc.go @@ -0,0 +1,29 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package + +// Package v1 contains the configuration API for tracing. +// +// The intention is to only have a single version of this API, potentially with +// new fields added over time in a backwards-compatible manner. Fields for +// alpha or beta features are allowed as long as they are defined so that not +// changing the defaults leaves those features disabled. +// +// The "v1" package name is just a reminder that API compatibility rules apply, +// not an indication of the stability of all features covered by it. + +package v1 // import "k8s.io/component-base/tracing/api/v1" diff --git a/staging/src/k8s.io/component-base/tracing/api/v1/types.go b/staging/src/k8s.io/component-base/tracing/api/v1/types.go new file mode 100644 index 00000000000..7b0776fd4c5 --- /dev/null +++ b/staging/src/k8s.io/component-base/tracing/api/v1/types.go @@ -0,0 +1,32 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +// TracingConfiguration provides versioned configuration for OpenTelemetry tracing clients. +type TracingConfiguration struct { + // Endpoint of the collector this component will report traces to. + // The connection is insecure, and does not currently support TLS. + // Recommended is unset, and endpoint is the otlp grpc default, localhost:4317. + // +optional + Endpoint *string `json:"endpoint,omitempty"` + + // SamplingRatePerMillion is the number of samples to collect per million spans. + // Recommended is unset. If unset, sampler respects its parent span's sampling + // rate, but otherwise never samples. + // +optional + SamplingRatePerMillion *int32 `json:"samplingRatePerMillion,omitempty"` +} diff --git a/staging/src/k8s.io/component-base/tracing/utils.go b/staging/src/k8s.io/component-base/tracing/utils.go new file mode 100644 index 00000000000..8c7ec983ca9 --- /dev/null +++ b/staging/src/k8s.io/component-base/tracing/utils.go @@ -0,0 +1,108 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tracing + +import ( + "context" + "net/http" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel/exporters/otlp" + "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + oteltrace "go.opentelemetry.io/otel/trace" + + "k8s.io/client-go/transport" + "k8s.io/component-base/tracing/api/v1" +) + +// NewProvider creates a TracerProvider in a component, and enforces recommended tracing behavior +func NewProvider(ctx context.Context, + tracingConfig *v1.TracingConfiguration, + addedOpts []otlpgrpc.Option, + resourceOpts []resource.Option, +) (oteltrace.TracerProvider, error) { + if tracingConfig == nil { + return oteltrace.NewNoopTracerProvider(), nil + } + opts := append([]otlpgrpc.Option{}, addedOpts...) + if tracingConfig.Endpoint != nil { + opts = append(opts, otlpgrpc.WithEndpoint(*tracingConfig.Endpoint)) + } + opts = append(opts, otlpgrpc.WithInsecure()) + driver := otlpgrpc.NewDriver(opts...) + exporter, err := otlp.NewExporter(ctx, driver) + if err != nil { + return nil, err + } + res, err := resource.New(ctx, resourceOpts...) + if err != nil { + return nil, err + } + + // sampler respects parent span's sampling rate or + // otherwise never samples. + sampler := sdktrace.NeverSample() + // Or, emit spans for a fraction of transactions + if tracingConfig.SamplingRatePerMillion != nil && *tracingConfig.SamplingRatePerMillion > 0 { + sampler = sdktrace.TraceIDRatioBased(float64(*tracingConfig.SamplingRatePerMillion) / float64(1000000)) + } + // batch span processor to aggregate spans before export. + bsp := sdktrace.NewBatchSpanProcessor(exporter) + tp := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.ParentBased(sampler)), + sdktrace.WithSpanProcessor(bsp), + sdktrace.WithResource(res), + ) + return tp, nil +} + +// WithTracing adds tracing to requests if the incoming request is sampled +func WithTracing(handler http.Handler, tp oteltrace.TracerProvider, serviceName string) http.Handler { + opts := []otelhttp.Option{ + otelhttp.WithPropagators(Propagators()), + otelhttp.WithTracerProvider(tp), + } + // With Noop TracerProvider, the otelhttp still handles context propagation. + // See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough + return otelhttp.NewHandler(handler, serviceName, opts...) +} + +// WrapperFor can be used to add tracing to a *rest.Config. +// Example usage: +// tp := NewProvider(...) +// config, _ := rest.InClusterConfig() +// config.Wrap(WrapperFor(&tp)) +// kubeclient, _ := clientset.NewForConfig(config) +func WrapperFor(tp oteltrace.TracerProvider) transport.WrapperFunc { + return func(rt http.RoundTripper) http.RoundTripper { + opts := []otelhttp.Option{ + otelhttp.WithPropagators(Propagators()), + otelhttp.WithTracerProvider(tp), + } + // With Noop TracerProvider, the otelhttp still handles context propagation. + // See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough + return otelhttp.NewTransport(rt, opts...) + } +} + +// Propagators returns the recommended set of propagators. +func Propagators() propagation.TextMapPropagator { + return propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}) +} diff --git a/staging/src/k8s.io/kubelet/config/v1beta1/types.go b/staging/src/k8s.io/kubelet/config/v1beta1/types.go index 2504f2aa253..36cbc660268 100644 --- a/staging/src/k8s.io/kubelet/config/v1beta1/types.go +++ b/staging/src/k8s.io/kubelet/config/v1beta1/types.go @@ -20,6 +20,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" logsapi "k8s.io/component-base/logs/api/v1" + tracingapi "k8s.io/component-base/tracing/api/v1" ) // HairpinMode denotes how the kubelet should configure networking to handle @@ -780,6 +781,11 @@ type KubeletConfiguration struct { // Default: true // +optional RegisterNode *bool `json:"registerNode,omitempty"` + // Tracing specifies the versioned configuration for OpenTelemetry tracing clients. + // See http://kep.k8s.io/2832 for more details. + // +featureGate=KubeletTracing + // +optional + Tracing *tracingapi.TracingConfiguration `json:"tracing,omitempty"` } type KubeletAuthorizationMode string diff --git a/test/e2e_node/util.go b/test/e2e_node/util.go index a584da5ac2c..5200815c753 100644 --- a/test/e2e_node/util.go +++ b/test/e2e_node/util.go @@ -31,6 +31,8 @@ import ( "strings" "time" + oteltrace "go.opentelemetry.io/otel/trace" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -298,7 +300,7 @@ func getCRIClient() (internalapi.RuntimeService, internalapi.ImageManagerService // connection timeout for CRI service connection const connectionTimeout = 2 * time.Minute runtimeEndpoint := framework.TestContext.ContainerRuntimeEndpoint - r, err := remote.NewRemoteRuntimeService(runtimeEndpoint, connectionTimeout) + r, err := remote.NewRemoteRuntimeService(runtimeEndpoint, connectionTimeout, oteltrace.NewNoopTracerProvider()) if err != nil { return nil, nil, err } diff --git a/test/integration/apiserver/tracing/tracing_test.go b/test/integration/apiserver/tracing/tracing_test.go index bb771eca793..003e51d2269 100644 --- a/test/integration/apiserver/tracing/tracing_test.go +++ b/test/integration/apiserver/tracing/tracing_test.go @@ -34,7 +34,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" client "k8s.io/client-go/kubernetes" featuregatetesting "k8s.io/component-base/featuregate/testing" - "k8s.io/component-base/traces" + "k8s.io/component-base/tracing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" ) @@ -84,7 +84,7 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil { // Create a client that creates sampled traces. tp := trace.TracerProvider(sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))) - clientConfig.Wrap(traces.WrapperFor(&tp)) + clientConfig.Wrap(tracing.WrapperFor(tp)) clientSet, err := client.NewForConfig(clientConfig) if err != nil { t.Fatal(err)