kubelet tracing

Signed-off-by: Sally O'Malley <somalley@redhat.com>
Co-authored-by: David Ashpole <dashpole@google.com>
This commit is contained in:
Sally O'Malley 2021-10-10 09:17:27 -04:00
parent bddb4ec08e
commit 47e7d8034f
Failed to extract signature
39 changed files with 683 additions and 473 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
"github.com/spf13/pflag" "github.com/spf13/pflag"
oteltrace "go.opentelemetry.io/otel/trace"
"k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission"
apiserveroptions "k8s.io/apiserver/pkg/server/options" apiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage/etcd3"
@ -153,10 +154,11 @@ func TestAddFlags(t *testing.T) {
StorageConfig: storagebackend.Config{ StorageConfig: storagebackend.Config{
Type: "etcd3", Type: "etcd3",
Transport: storagebackend.TransportConfig{ Transport: storagebackend.TransportConfig{
ServerList: nil, ServerList: nil,
KeyFile: "/var/run/kubernetes/etcd.key", KeyFile: "/var/run/kubernetes/etcd.key",
TrustedCAFile: "/var/run/kubernetes/etcdca.crt", TrustedCAFile: "/var/run/kubernetes/etcdca.crt",
CertFile: "/var/run/kubernetes/etcdce.crt", CertFile: "/var/run/kubernetes/etcdce.crt",
TracerProvider: oteltrace.NewNoopTracerProvider(),
}, },
Paging: true, Paging: true,
Prefix: "/registry", Prefix: "/registry",

View File

@ -31,6 +31,8 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
oteltrace "go.opentelemetry.io/otel/trace"
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
@ -417,8 +419,10 @@ func buildGenericConfig(
if genericConfig.EgressSelector != nil { if genericConfig.EgressSelector != nil {
storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
} }
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) && genericConfig.TracerProvider != nil { if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
storageFactory.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider storageFactory.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider
} else {
storageFactory.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider()
} }
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil { if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return return

View File

@ -39,6 +39,10 @@ import (
"k8s.io/mount-utils" "k8s.io/mount-utils"
cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi "github.com/google/cadvisor/info/v1"
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
otelsdkresource "go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/semconv"
oteltrace "go.opentelemetry.io/otel/trace"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -68,6 +72,7 @@ import (
logsapi "k8s.io/component-base/logs/api/v1" logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/component-base/metrics" "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
tracing "k8s.io/component-base/tracing"
"k8s.io/component-base/version" "k8s.io/component-base/version"
"k8s.io/component-base/version/verflag" "k8s.io/component-base/version/verflag"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
@ -373,6 +378,13 @@ func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.Fea
if err != nil { if err != nil {
return nil, err return nil, err
} }
tp := oteltrace.NewNoopTracerProvider()
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
tp, err = newTracerProvider(s)
if err != nil {
return nil, err
}
}
return &kubelet.Dependencies{ return &kubelet.Dependencies{
Auth: nil, // default does not enforce auth[nz] Auth: nil, // default does not enforce auth[nz]
CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
@ -381,6 +393,7 @@ func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.Fea
KubeClient: nil, KubeClient: nil,
HeartbeatClient: nil, HeartbeatClient: nil,
EventClient: nil, EventClient: nil,
TracerProvider: tp,
HostUtil: hu, HostUtil: hu,
Mounter: mounter, Mounter: mounter,
Subpather: subpather, Subpather: subpather,
@ -563,7 +576,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
klog.InfoS("Standalone mode, no API client") klog.InfoS("Standalone mode, no API client")
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil: case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, nodeName) clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, kubeDeps.TracerProvider, nodeName)
if err != nil { if err != nil {
return err return err
} }
@ -790,7 +803,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
// buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether // buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether
// bootstrapping is enabled or client certificate rotation is enabled. // bootstrapping is enabled or client certificate rotation is enabled.
func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) { func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, tp oteltrace.TracerProvider, nodeName types.NodeName) (*restclient.Config, func(), error) {
if s.RotateCertificates { if s.RotateCertificates {
// Rules for client rotation and the handling of kube config files: // Rules for client rotation and the handling of kube config files:
// //
@ -905,6 +918,9 @@ func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, nod
utilnet.CloseIdleConnectionsFor(clientConfig.Transport) utilnet.CloseIdleConnectionsFor(clientConfig.Transport)
} }
} }
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
clientConfig.Wrap(tracing.WrapperFor(tp))
}
return clientConfig, onHeartbeatFailure, nil return clientConfig, onHeartbeatFailure, nil
} }
@ -1167,7 +1183,7 @@ func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubele
// start the kubelet server // start the kubelet server
if enableServer { if enableServer {
go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth) go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
} }
if kubeCfg.ReadOnlyPort > 0 { if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort)) go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
@ -1250,3 +1266,24 @@ func parseResourceList(m map[string]string) (v1.ResourceList, error) {
} }
return rl, nil return rl, nil
} }
func newTracerProvider(s *options.KubeletServer) (oteltrace.TracerProvider, error) {
if s.KubeletConfiguration.Tracing == nil {
return oteltrace.NewNoopTracerProvider(), nil
}
hostname, err := nodeutil.GetHostname(s.HostnameOverride)
if err != nil {
return nil, fmt.Errorf("could not determine hostname for tracer provider: %v", err)
}
resourceOpts := []otelsdkresource.Option{
otelsdkresource.WithAttributes(
semconv.ServiceNameKey.String(componentKubelet),
semconv.HostNameKey.String(hostname),
),
}
tp, err := tracing.NewProvider(context.Background(), s.KubeletConfiguration.Tracing, []otlpgrpc.Option{}, resourceOpts)
if err != nil {
return nil, fmt.Errorf("could not configure tracer provider: %v", err)
}
return tp, nil
}

View File

@ -26,6 +26,7 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/pflag" "github.com/spf13/pflag"
oteltrace "go.opentelemetry.io/otel/trace"
internalapi "k8s.io/cri-api/pkg/apis" internalapi "k8s.io/cri-api/pkg/apis"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -246,7 +247,7 @@ func run(cmd *cobra.Command, config *hollowNodeConfig) error {
return fmt.Errorf("Failed to start fake runtime, error: %w", err) return fmt.Errorf("Failed to start fake runtime, error: %w", err)
} }
defer fakeRemoteRuntime.Stop() defer fakeRemoteRuntime.Stop()
runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second) runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second, oteltrace.NewNoopTracerProvider())
if err != nil { if err != nil {
return fmt.Errorf("Failed to init runtime service, error: %w", err) return fmt.Errorf("Failed to init runtime service, error: %w", err)
} }

View File

@ -498,6 +498,13 @@ const (
// Enable POD resources API to return allocatable resources // Enable POD resources API to return allocatable resources
KubeletPodResourcesGetAllocatable featuregate.Feature = "KubeletPodResourcesGetAllocatable" KubeletPodResourcesGetAllocatable featuregate.Feature = "KubeletPodResourcesGetAllocatable"
// owner: @sallyom
// kep: http://kep.k8s.io/2832
// alpha: v1.25
//
// Add support for distributed tracing in the kubelet
KubeletTracing featuregate.Feature = "KubeletTracing"
// owner: @zshihang // owner: @zshihang
// kep: http://kep.k8s.io/2800 // kep: http://kep.k8s.io/2800
// beta: v1.24 // beta: v1.24
@ -961,6 +968,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
KubeletPodResourcesGetAllocatable: {Default: true, PreRelease: featuregate.Beta}, KubeletPodResourcesGetAllocatable: {Default: true, PreRelease: featuregate.Beta},
KubeletTracing: {Default: false, PreRelease: featuregate.Alpha},
LegacyServiceAccountTokenNoAutoGeneration: {Default: true, PreRelease: featuregate.Beta}, LegacyServiceAccountTokenNoAutoGeneration: {Default: true, PreRelease: featuregate.Beta},
LocalStorageCapacityIsolation: {Default: true, PreRelease: featuregate.Beta}, LocalStorageCapacityIsolation: {Default: true, PreRelease: featuregate.Beta},

View File

@ -47,7 +47,7 @@ type Config struct {
} }
// New sets up the plugins and admission start hooks needed for admission // New sets up the plugins and admission start hooks needed for admission
func (c *Config) New(proxyTransport *http.Transport, egressSelector *egressselector.EgressSelector, serviceResolver webhook.ServiceResolver, tp *trace.TracerProvider) ([]admission.PluginInitializer, genericapiserver.PostStartHookFunc, error) { func (c *Config) New(proxyTransport *http.Transport, egressSelector *egressselector.EgressSelector, serviceResolver webhook.ServiceResolver, tp trace.TracerProvider) ([]admission.PluginInitializer, genericapiserver.PostStartHookFunc, error) {
webhookAuthResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, egressSelector, c.LoopbackClientConfig, tp) webhookAuthResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, egressSelector, c.LoopbackClientConfig, tp)
webhookPluginInitializer := webhookinit.NewPluginInitializer(webhookAuthResolverWrapper, serviceResolver) webhookPluginInitializer := webhookinit.NewPluginInitializer(webhookAuthResolverWrapper, serviceResolver)

View File

@ -280,5 +280,7 @@ var (
"ShutdownGracePeriod.Duration", "ShutdownGracePeriod.Duration",
"ShutdownGracePeriodCriticalPods.Duration", "ShutdownGracePeriodCriticalPods.Duration",
"MemoryThrottlingFactor", "MemoryThrottlingFactor",
"Tracing.Endpoint",
"Tracing.SamplingRatePerMillion",
) )
) )

View File

@ -24,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
componentconfigtesting "k8s.io/component-base/config/testing" componentconfigtesting "k8s.io/component-base/config/testing"
logsapi "k8s.io/component-base/logs/api/v1" logsapi "k8s.io/component-base/logs/api/v1"
tracingapi "k8s.io/component-base/tracing/api/v1"
) )
func TestComponentConfigSetup(t *testing.T) { func TestComponentConfigSetup(t *testing.T) {
@ -33,11 +34,12 @@ func TestComponentConfigSetup(t *testing.T) {
SchemeGroupVersion: SchemeGroupVersion, SchemeGroupVersion: SchemeGroupVersion,
AddToScheme: AddToScheme, AddToScheme: AddToScheme,
AllowedTags: map[reflect.Type]bool{ AllowedTags: map[reflect.Type]bool{
reflect.TypeOf(logsapi.LoggingConfiguration{}): true, reflect.TypeOf(logsapi.LoggingConfiguration{}): true,
reflect.TypeOf(metav1.Duration{}): true, reflect.TypeOf(tracingapi.TracingConfiguration{}): true,
reflect.TypeOf(metav1.TypeMeta{}): true, reflect.TypeOf(metav1.Duration{}): true,
reflect.TypeOf(v1.NodeConfigSource{}): true, reflect.TypeOf(metav1.TypeMeta{}): true,
reflect.TypeOf(v1.Taint{}): true, reflect.TypeOf(v1.NodeConfigSource{}): true,
reflect.TypeOf(v1.Taint{}): true,
}, },
} }

View File

@ -20,6 +20,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
logsapi "k8s.io/component-base/logs/api/v1" logsapi "k8s.io/component-base/logs/api/v1"
tracingapi "k8s.io/component-base/tracing/api/v1"
) )
// HairpinMode denotes how the kubelet should configure networking to handle // HairpinMode denotes how the kubelet should configure networking to handle
@ -441,10 +442,14 @@ type KubeletConfiguration struct {
// is true and upon the initial registration of the node. // is true and upon the initial registration of the node.
// +optional // +optional
RegisterWithTaints []v1.Taint RegisterWithTaints []v1.Taint
// registerNode enables automatic registration with the apiserver. // registerNode enables automatic registration with the apiserver.
// +optional // +optional
RegisterNode bool RegisterNode bool
// Tracing specifies the versioned configuration for OpenTelemetry tracing clients.
// See http://kep.k8s.io/2832 for more details.
// +featureGate=KubeletTracing
// +optional
Tracing *tracingapi.TracingConfiguration
} }
// KubeletAuthorizationMode denotes the authorization mode for the kubelet // KubeletAuthorizationMode denotes the authorization mode for the kubelet

View File

@ -27,6 +27,7 @@ import (
"k8s.io/component-base/featuregate" "k8s.io/component-base/featuregate"
logsapi "k8s.io/component-base/logs/api/v1" logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/component-base/metrics" "k8s.io/component-base/metrics"
tracingapi "k8s.io/component-base/tracing/api/v1"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
@ -241,6 +242,14 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration, featur
allErrors = append(allErrors, errs.ToAggregate().Errors()...) allErrors = append(allErrors, errs.ToAggregate().Errors()...)
} }
if localFeatureGate.Enabled(features.KubeletTracing) {
if errs := tracingapi.ValidateTracingConfiguration(kc.Tracing, localFeatureGate, field.NewPath("tracing")); len(errs) > 0 {
allErrors = append(allErrors, errs.ToAggregate().Errors()...)
}
} else if kc.Tracing != nil {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: tracing should not be configured if KubeletTracing feature flag is disabled."))
}
if localFeatureGate.Enabled(features.MemoryQoS) && kc.MemoryThrottlingFactor == nil { if localFeatureGate.Enabled(features.MemoryQoS) && kc.MemoryThrottlingFactor == nil {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: memoryThrottlingFactor is required when MemoryQoS feature flag is enabled")) allErrors = append(allErrors, fmt.Errorf("invalid configuration: memoryThrottlingFactor is required when MemoryQoS feature flag is enabled"))
} }

View File

@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
logsapi "k8s.io/component-base/logs/api/v1" logsapi "k8s.io/component-base/logs/api/v1"
tracingapi "k8s.io/component-base/tracing/api/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/config/validation" "k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -497,6 +498,36 @@ func TestValidateKubeletConfiguration(t *testing.T) {
}, },
errMsg: "invalid configuration: taint.TimeAdded is not nil", errMsg: "invalid configuration: taint.TimeAdded is not nil",
}, },
{
name: "specify tracing with KubeletTracing disabled",
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
samplingRate := int32(99999)
conf.FeatureGates = map[string]bool{"KubeletTracing": false}
conf.Tracing = &tracingapi.TracingConfiguration{SamplingRatePerMillion: &samplingRate}
return conf
},
errMsg: "invalid configuration: tracing should not be configured if KubeletTracing feature flag is disabled.",
},
{
name: "specify tracing invalid sampling rate",
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
samplingRate := int32(-1)
conf.FeatureGates = map[string]bool{"KubeletTracing": true}
conf.Tracing = &tracingapi.TracingConfiguration{SamplingRatePerMillion: &samplingRate}
return conf
},
errMsg: "tracing.samplingRatePerMillion: Invalid value: -1: sampling rate must be positive",
},
{
name: "specify tracing invalid endpoint",
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
ep := "dn%2s://localhost:4317"
conf.FeatureGates = map[string]bool{"KubeletTracing": true}
conf.Tracing = &tracingapi.TracingConfiguration{Endpoint: &ep}
return conf
},
errMsg: "tracing.endpoint: Invalid value: \"dn%2s://localhost:4317\": parse \"dn%2s://localhost:4317\": first path segment in URL cannot contain colon",
},
} }
for _, tc := range cases { for _, tc := range cases {

View File

@ -23,16 +23,20 @@ import (
"strings" "strings"
"time" "time"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"k8s.io/klog/v2" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/logs/logreduction" "k8s.io/component-base/logs/logreduction"
tracing "k8s.io/component-base/tracing"
internalapi "k8s.io/cri-api/pkg/apis" internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
runtimeapiV1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" runtimeapiV1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/probe/exec" "k8s.io/kubernetes/pkg/probe/exec"
utilexec "k8s.io/utils/exec" utilexec "k8s.io/utils/exec"
@ -68,7 +72,7 @@ const (
) )
// NewRemoteRuntimeService creates a new internalapi.RuntimeService. // NewRemoteRuntimeService creates a new internalapi.RuntimeService.
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) { func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.RuntimeService, error) {
klog.V(3).InfoS("Connecting to runtime service", "endpoint", endpoint) klog.V(3).InfoS("Connecting to runtime service", "endpoint", endpoint)
addr, dialer, err := util.GetAddressAndDialer(endpoint) addr, dialer, err := util.GetAddressAndDialer(endpoint)
if err != nil { if err != nil {
@ -77,10 +81,23 @@ func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel() defer cancel()
conn, err := grpc.DialContext(ctx, addr, dialOpts := []grpc.DialOption{}
dialOpts = append(dialOpts,
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialer), grpc.WithContextDialer(dialer),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
tracingOpts := []otelgrpc.Option{
otelgrpc.WithPropagators(tracing.Propagators()),
otelgrpc.WithTracerProvider(tp),
}
// Even if there is no TracerProvider, the otelgrpc still handles context propagation.
// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
dialOpts = append(dialOpts,
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...)))
}
conn, err := grpc.DialContext(ctx, addr, dialOpts...)
if err != nil { if err != nil {
klog.ErrorS(err, "Connect remote runtime failed", "address", addr) klog.ErrorS(err, "Connect remote runtime failed", "address", addr)
return nil, err return nil, err

View File

@ -21,6 +21,8 @@ import (
"testing" "testing"
"time" "time"
oteltrace "go.opentelemetry.io/otel/trace"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
internalapi "k8s.io/cri-api/pkg/apis" internalapi "k8s.io/cri-api/pkg/apis"
@ -47,7 +49,7 @@ func createAndStartFakeRemoteRuntime(t *testing.T) (*fakeremote.RemoteRuntime, s
} }
func createRemoteRuntimeService(endpoint string, t *testing.T) internalapi.RuntimeService { func createRemoteRuntimeService(endpoint string, t *testing.T) internalapi.RuntimeService {
runtimeService, err := NewRemoteRuntimeService(endpoint, defaultConnectionTimeout) runtimeService, err := NewRemoteRuntimeService(endpoint, defaultConnectionTimeout, oteltrace.NewNoopTracerProvider())
require.NoError(t, err) require.NoError(t, err)
return runtimeService return runtimeService

View File

@ -38,6 +38,7 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi "github.com/google/cadvisor/info/v1"
libcontaineruserns "github.com/opencontainers/runc/libcontainer/userns" libcontaineruserns "github.com/opencontainers/runc/libcontainer/userns"
"go.opentelemetry.io/otel/trace"
"k8s.io/mount-utils" "k8s.io/mount-utils"
"k8s.io/utils/integer" "k8s.io/utils/integer"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
@ -206,7 +207,7 @@ type Bootstrap interface {
GetConfiguration() kubeletconfiginternal.KubeletConfiguration GetConfiguration() kubeletconfiginternal.KubeletConfiguration
BirthCry() BirthCry()
StartGarbageCollection() StartGarbageCollection()
ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider)
ListenAndServeReadOnly(address net.IP, port uint) ListenAndServeReadOnly(address net.IP, port uint)
ListenAndServePodResources() ListenAndServePodResources()
Run(<-chan kubetypes.PodUpdate) Run(<-chan kubetypes.PodUpdate)
@ -236,6 +237,7 @@ type Dependencies struct {
ProbeManager prober.Manager ProbeManager prober.Manager
Recorder record.EventRecorder Recorder record.EventRecorder
Subpather subpath.Interface Subpather subpath.Interface
TracerProvider trace.TracerProvider
VolumePlugins []volume.VolumePlugin VolumePlugins []volume.VolumePlugin
DynamicPluginProber volume.DynamicPluginProber DynamicPluginProber volume.DynamicPluginProber
TLSOptions *server.TLSOptions TLSOptions *server.TLSOptions
@ -293,7 +295,7 @@ func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
} }
var err error var err error
if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil { if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
return err return err
} }
if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil { if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil {
@ -2395,8 +2397,8 @@ func (kl *Kubelet) ResyncInterval() time.Duration {
// ListenAndServe runs the kubelet HTTP server. // ListenAndServe runs the kubelet HTTP server.
func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions,
auth server.AuthInterface) { auth server.AuthInterface, tp trace.TracerProvider) {
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth) server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth, tp)
} }
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode. // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.

View File

@ -37,6 +37,7 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorv2 "github.com/google/cadvisor/info/v2" cadvisorv2 "github.com/google/cadvisor/info/v2"
"github.com/google/cadvisor/metrics" "github.com/google/cadvisor/metrics"
oteltrace "go.opentelemetry.io/otel/trace"
"google.golang.org/grpc" "google.golang.org/grpc"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors" "k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
@ -63,6 +64,7 @@ import (
compbasemetrics "k8s.io/component-base/metrics" compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
tracing "k8s.io/component-base/tracing"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
podresourcesapiv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1" podresourcesapiv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
@ -127,10 +129,13 @@ type containerInterface interface {
// so we can ensure restful.FilterFunctions are used for all handlers // so we can ensure restful.FilterFunctions are used for all handlers
type filteringContainer struct { type filteringContainer struct {
*restful.Container *restful.Container
oteltrace.TracerProvider
registeredHandlePaths []string registeredHandlePaths []string
} }
func (a *filteringContainer) Handle(path string, handler http.Handler) { func (a *filteringContainer) Handle(path string, handler http.Handler) {
handler = tracing.WithTracing(handler, a.TracerProvider, "kubelet")
a.HandleWithFilter(path, handler) a.HandleWithFilter(path, handler)
a.registeredHandlePaths = append(a.registeredHandlePaths, path) a.registeredHandlePaths = append(a.registeredHandlePaths, path)
} }
@ -144,12 +149,13 @@ func ListenAndServeKubeletServer(
resourceAnalyzer stats.ResourceAnalyzer, resourceAnalyzer stats.ResourceAnalyzer,
kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeCfg *kubeletconfiginternal.KubeletConfiguration,
tlsOptions *TLSOptions, tlsOptions *TLSOptions,
auth AuthInterface) { auth AuthInterface,
tp oteltrace.TracerProvider) {
address := netutils.ParseIPSloppy(kubeCfg.Address) address := netutils.ParseIPSloppy(kubeCfg.Address)
port := uint(kubeCfg.Port) port := uint(kubeCfg.Port)
klog.InfoS("Starting to listen", "address", address, "port", port) klog.InfoS("Starting to listen", "address", address, "port", port)
handler := NewServer(host, resourceAnalyzer, auth, kubeCfg) handler := NewServer(host, resourceAnalyzer, auth, tp, kubeCfg)
s := &http.Server{ s := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
Handler: &handler, Handler: &handler,
@ -158,6 +164,7 @@ func ListenAndServeKubeletServer(
WriteTimeout: 4 * 60 * time.Minute, WriteTimeout: 4 * 60 * time.Minute,
MaxHeaderBytes: 1 << 20, MaxHeaderBytes: 1 << 20,
} }
if tlsOptions != nil { if tlsOptions != nil {
s.TLSConfig = tlsOptions.Config s.TLSConfig = tlsOptions.Config
// Passing empty strings as the cert and key files means no // Passing empty strings as the cert and key files means no
@ -174,9 +181,14 @@ func ListenAndServeKubeletServer(
} }
// ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet. // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint) { func ListenAndServeKubeletReadOnlyServer(
host HostInterface,
resourceAnalyzer stats.ResourceAnalyzer,
address net.IP,
port uint) {
klog.InfoS("Starting to listen read-only", "address", address, "port", port) klog.InfoS("Starting to listen read-only", "address", address, "port", port)
s := NewServer(host, resourceAnalyzer, nil, nil) // TODO: https://github.com/kubernetes/kubernetes/issues/109829 tracer should use WithPublicEndpoint
s := NewServer(host, resourceAnalyzer, nil, oteltrace.NewNoopTracerProvider(), nil)
server := &http.Server{ server := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
@ -241,12 +253,15 @@ func NewServer(
host HostInterface, host HostInterface,
resourceAnalyzer stats.ResourceAnalyzer, resourceAnalyzer stats.ResourceAnalyzer,
auth AuthInterface, auth AuthInterface,
tp oteltrace.TracerProvider,
kubeCfg *kubeletconfiginternal.KubeletConfiguration) Server { kubeCfg *kubeletconfiginternal.KubeletConfiguration) Server {
rc := &filteringContainer{Container: restful.NewContainer(), TracerProvider: tp}
server := Server{ server := Server{
host: host, host: host,
resourceAnalyzer: resourceAnalyzer, resourceAnalyzer: resourceAnalyzer,
auth: auth, auth: auth,
restfulCont: &filteringContainer{Container: restful.NewContainer()}, restfulCont: rc,
metricsBuckets: sets.NewString(), metricsBuckets: sets.NewString(),
metricsMethodBuckets: sets.NewString("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"), metricsMethodBuckets: sets.NewString("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"),
} }

View File

@ -37,6 +37,7 @@ import (
cadvisorapiv2 "github.com/google/cadvisor/info/v2" cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
oteltrace "go.opentelemetry.io/otel/trace"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@ -358,6 +359,7 @@ func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletCo
fw.fakeKubelet, fw.fakeKubelet,
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}), stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}),
fw.fakeAuth, fw.fakeAuth,
oteltrace.NewNoopTracerProvider(),
kubeCfg, kubeCfg,
) )
fw.serverUnderTest = &server fw.serverUnderTest = &server

View File

@ -23,6 +23,7 @@ import (
"net/url" "net/url"
"github.com/spf13/pflag" "github.com/spf13/pflag"
oteltrace "go.opentelemetry.io/otel/trace"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
@ -111,7 +112,7 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err
ExtraConfig: apiserver.ExtraConfig{ ExtraConfig: apiserver.ExtraConfig{
CRDRESTOptionsGetter: NewCRDRESTOptionsGetter(*o.RecommendedOptions.Etcd), CRDRESTOptionsGetter: NewCRDRESTOptionsGetter(*o.RecommendedOptions.Etcd),
ServiceResolver: &serviceResolver{serverConfig.SharedInformerFactory.Core().V1().Services().Lister()}, ServiceResolver: &serviceResolver{serverConfig.SharedInformerFactory.Core().V1().Services().Lister()},
AuthResolverWrapper: webhook.NewDefaultAuthenticationInfoResolverWrapper(nil, nil, serverConfig.LoopbackClientConfig, nil), AuthResolverWrapper: webhook.NewDefaultAuthenticationInfoResolverWrapper(nil, nil, serverConfig.LoopbackClientConfig, oteltrace.NewNoopTracerProvider()),
}, },
} }
return config, nil return config, nil

View File

@ -22,19 +22,17 @@ import (
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"k8s.io/component-base/traces" tracing "k8s.io/component-base/tracing"
) )
// WithTracing adds tracing to requests if the incoming request is sampled // WithTracing adds tracing to requests if the incoming request is sampled
func WithTracing(handler http.Handler, tp *trace.TracerProvider) http.Handler { func WithTracing(handler http.Handler, tp trace.TracerProvider) http.Handler {
opts := []otelhttp.Option{ opts := []otelhttp.Option{
otelhttp.WithPropagators(traces.Propagators()), otelhttp.WithPropagators(tracing.Propagators()),
otelhttp.WithPublicEndpoint(), otelhttp.WithPublicEndpoint(),
otelhttp.WithTracerProvider(tp),
} }
if tp != nil { // With Noop TracerProvider, the otelhttp still handles context propagation.
opts = append(opts, otelhttp.WithTracerProvider(*tp))
}
// Even if there is no TracerProvider, the otelhttp still handles context propagation.
// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough // See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
return otelhttp.NewHandler(handler, "KubernetesAPI", opts...) return otelhttp.NewHandler(handler, "KubernetesAPI", opts...)
} }

View File

@ -30,7 +30,7 @@ import (
jsonpatch "github.com/evanphx/json-patch" jsonpatch "github.com/evanphx/json-patch"
"github.com/google/uuid" "github.com/google/uuid"
"go.opentelemetry.io/otel/trace" oteltrace "go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@ -139,7 +139,7 @@ type Config struct {
ExternalAddress string ExternalAddress string
// TracerProvider can provide a tracer, which records spans for distributed tracing. // TracerProvider can provide a tracer, which records spans for distributed tracing.
TracerProvider *trace.TracerProvider TracerProvider oteltrace.TracerProvider
//=========================================================================== //===========================================================================
// Fields you probably don't care about changing // Fields you probably don't care about changing
@ -372,6 +372,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
APIServerID: id, APIServerID: id,
StorageVersionManager: storageversion.NewDefaultManager(), StorageVersionManager: storageversion.NewDefaultManager(),
TracerProvider: oteltrace.NewNoopTracerProvider(),
} }
} }

View File

@ -28,7 +28,7 @@ import (
clientgoclientset "k8s.io/client-go/kubernetes" clientgoclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"k8s.io/component-base/traces" tracing "k8s.io/component-base/tracing"
) )
// CoreAPIOptions contains options to configure the connection to a core API Kubernetes apiserver. // CoreAPIOptions contains options to configure the connection to a core API Kubernetes apiserver.
@ -73,7 +73,7 @@ func (o *CoreAPIOptions) ApplyTo(config *server.RecommendedConfig) error {
} }
} }
if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) { if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) {
kubeconfig.Wrap(traces.WrapperFor(config.TracerProvider)) kubeconfig.Wrap(tracing.WrapperFor(config.TracerProvider))
} }
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeconfig) clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeconfig)
if err != nil { if err != nil {

View File

@ -107,10 +107,8 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
if err := o.EgressSelector.ApplyTo(&config.Config); err != nil { if err := o.EgressSelector.ApplyTo(&config.Config); err != nil {
return err return err
} }
if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) { if err := o.Traces.ApplyTo(config.Config.EgressSelector, &config.Config); err != nil {
if err := o.Traces.ApplyTo(config.Config.EgressSelector, &config.Config); err != nil { return err
return err
}
} }
if err := o.SecureServing.ApplyTo(&config.Config.SecureServing, &config.Config.LoopbackClientConfig); err != nil { if err := o.SecureServing.ApplyTo(&config.Config.SecureServing, &config.Config.LoopbackClientConfig); err != nil {
return err return err

View File

@ -19,24 +19,39 @@ package options
import ( import (
"context" "context"
"fmt" "fmt"
"io/ioutil"
"net" "net"
"github.com/spf13/pflag" "github.com/spf13/pflag"
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv" "go.opentelemetry.io/otel/semconv"
"google.golang.org/grpc" "google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/apis/apiserver"
"k8s.io/apiserver/pkg/apis/apiserver/install"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/tracing" "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/traces" tracing "k8s.io/component-base/tracing"
tracingapi "k8s.io/component-base/tracing/api/v1"
"k8s.io/utils/path" "k8s.io/utils/path"
) )
const apiserverService = "apiserver" const apiserverService = "apiserver"
var (
cfgScheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(cfgScheme)
)
func init() {
install.Install(cfgScheme)
}
// TracingOptions contain configuration options for tracing // TracingOptions contain configuration options for tracing
// exporters // exporters
type TracingOptions struct { type TracingOptions struct {
@ -64,21 +79,21 @@ func (o *TracingOptions) ApplyTo(es *egressselector.EgressSelector, c *server.Co
if o == nil || o.ConfigFile == "" { if o == nil || o.ConfigFile == "" {
return nil return nil
} }
if !feature.DefaultFeatureGate.Enabled(features.APIServerTracing) {
return fmt.Errorf("APIServerTracing feature is not enabled, but tracing config file was provided")
}
npConfig, err := tracing.ReadTracingConfiguration(o.ConfigFile) traceConfig, err := ReadTracingConfiguration(o.ConfigFile)
if err != nil { if err != nil {
return fmt.Errorf("failed to read tracing config: %v", err) return fmt.Errorf("failed to read tracing config: %v", err)
} }
errs := tracing.ValidateTracingConfiguration(npConfig) errs := tracingapi.ValidateTracingConfiguration(traceConfig, feature.DefaultFeatureGate, nil)
if len(errs) > 0 { if len(errs) > 0 {
return fmt.Errorf("failed to validate tracing configuration: %v", errs.ToAggregate()) return fmt.Errorf("failed to validate tracing configuration: %v", errs.ToAggregate())
} }
opts := []otlpgrpc.Option{} opts := []otlpgrpc.Option{}
if npConfig.Endpoint != nil {
opts = append(opts, otlpgrpc.WithEndpoint(*npConfig.Endpoint))
}
if es != nil { if es != nil {
// Only use the egressselector dialer if egressselector is enabled. // Only use the egressselector dialer if egressselector is enabled.
// Endpoint is on the "ControlPlane" network // Endpoint is on the "ControlPlane" network
@ -93,21 +108,19 @@ func (o *TracingOptions) ApplyTo(es *egressselector.EgressSelector, c *server.Co
opts = append(opts, otlpgrpc.WithDialOption(grpc.WithContextDialer(otelDialer))) opts = append(opts, otlpgrpc.WithDialOption(grpc.WithContextDialer(otelDialer)))
} }
sampler := sdktrace.NeverSample()
if npConfig.SamplingRatePerMillion != nil && *npConfig.SamplingRatePerMillion > 0 {
sampler = sdktrace.TraceIDRatioBased(float64(*npConfig.SamplingRatePerMillion) / float64(1000000))
}
resourceOpts := []resource.Option{ resourceOpts := []resource.Option{
resource.WithAttributes( resource.WithAttributes(
semconv.ServiceNameKey.String(apiserverService), semconv.ServiceNameKey.String(apiserverService),
semconv.ServiceInstanceIDKey.String(c.APIServerID), semconv.ServiceInstanceIDKey.String(c.APIServerID),
), ),
} }
tp := traces.NewProvider(context.Background(), sampler, resourceOpts, opts...) tp, err := tracing.NewProvider(context.Background(), traceConfig, opts, resourceOpts)
c.TracerProvider = &tp if err != nil {
return err
}
c.TracerProvider = tp
if c.LoopbackClientConfig != nil { if c.LoopbackClientConfig != nil {
c.LoopbackClientConfig.Wrap(traces.WrapperFor(c.TracerProvider)) c.LoopbackClientConfig.Wrap(tracing.WrapperFor(c.TracerProvider))
} }
return nil return nil
} }
@ -125,3 +138,24 @@ func (o *TracingOptions) Validate() (errs []error) {
} }
return return
} }
// ReadTracingConfiguration reads the tracing configuration from a file
func ReadTracingConfiguration(configFilePath string) (*tracingapi.TracingConfiguration, error) {
if configFilePath == "" {
return nil, fmt.Errorf("tracing config file was empty")
}
data, err := ioutil.ReadFile(configFilePath)
if err != nil {
return nil, fmt.Errorf("unable to read tracing configuration from %q: %v", configFilePath, err)
}
internalConfig := &apiserver.TracingConfiguration{}
// this handles json/yaml/whatever, and decodes all registered version to the internal version
if err := runtime.DecodeInto(codecs.UniversalDecoder(), data, internalConfig); err != nil {
return nil, fmt.Errorf("unable to decode tracing configuration data: %v", err)
}
tc := &tracingapi.TracingConfiguration{
Endpoint: internalConfig.Endpoint,
SamplingRatePerMillion: internalConfig.SamplingRatePerMillion,
}
return tc, nil
}

View File

@ -16,7 +16,26 @@ limitations under the License.
package options package options
import "testing" import (
"fmt"
"io/ioutil"
"os"
"reflect"
"strings"
"testing"
tracingapi "k8s.io/component-base/tracing/api/v1"
)
var (
localhost = "localhost:4317"
ipAddress = "127.0.0.1:4317"
samplingRate = int32(12345)
)
func strptr(s string) *string {
return &s
}
func TestValidateTracingOptions(t *testing.T) { func TestValidateTracingOptions(t *testing.T) {
testcases := []struct { testcases := []struct {
@ -56,3 +75,110 @@ func TestValidateTracingOptions(t *testing.T) {
}) })
} }
} }
func TestReadTracingConfiguration(t *testing.T) {
testcases := []struct {
name string
contents string
createFile bool
expectedResult *tracingapi.TracingConfiguration
expectedError *string
}{
{
name: "empty",
createFile: true,
contents: ``,
expectedResult: &tracingapi.TracingConfiguration{},
expectedError: nil,
},
{
name: "absent",
createFile: false,
contents: ``,
expectedResult: nil,
expectedError: strptr("unable to read tracing configuration from \"test-tracing-config-absent\": open test-tracing-config-absent: no such file or directory"),
},
{
name: "v1alpha1",
createFile: true,
contents: `
apiVersion: apiserver.config.k8s.io/v1alpha1
kind: TracingConfiguration
endpoint: localhost:4317
samplingRatePerMillion: 12345
`,
expectedResult: &tracingapi.TracingConfiguration{
Endpoint: &localhost,
SamplingRatePerMillion: &samplingRate,
},
expectedError: nil,
},
{
name: "ip address",
createFile: true,
contents: `
apiVersion: apiserver.config.k8s.io/v1alpha1
kind: TracingConfiguration
endpoint: 127.0.0.1:4317
`,
expectedResult: &tracingapi.TracingConfiguration{
Endpoint: &ipAddress,
},
expectedError: nil,
},
{
name: "wrong_type",
createFile: true,
contents: `
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: agent
spec:
selector:
matchLabels:
k8s-app: agent
template:
metadata:
labels:
k8s-app: agent
spec:
containers:
- image: k8s.gcr.io/busybox
name: agent
`,
expectedResult: nil,
expectedError: strptr("unable to decode tracing configuration data: no kind \"DaemonSet\" is registered for version \"apps/v1\" in scheme"),
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
proxyConfig := fmt.Sprintf("test-tracing-config-%s", tc.name)
if tc.createFile {
f, err := ioutil.TempFile("", proxyConfig)
if err != nil {
t.Fatal(err)
}
defer os.Remove(f.Name())
if err := ioutil.WriteFile(f.Name(), []byte(tc.contents), os.FileMode(0755)); err != nil {
t.Fatal(err)
}
proxyConfig = f.Name()
}
config, err := ReadTracingConfiguration(proxyConfig)
if err == nil && tc.expectedError != nil {
t.Errorf("calling ReadTracingConfiguration expected error: %s, did not get it", *tc.expectedError)
}
if err != nil && tc.expectedError == nil {
t.Errorf("unexpected error calling ReadTracingConfiguration got: %#v", err)
}
if err != nil && tc.expectedError != nil && !strings.HasPrefix(err.Error(), *tc.expectedError) {
t.Errorf("calling ReadTracingConfiguration expected error: %s, got %#v", *tc.expectedError, err)
}
if !reflect.DeepEqual(config, tc.expectedResult) {
t.Errorf("problem with configuration returned from ReadTracingConfiguration expected: %#v, got: %#v", tc.expectedResult, config)
}
})
}
}

View File

@ -19,7 +19,8 @@ package storagebackend
import ( import (
"time" "time"
"go.opentelemetry.io/otel/trace" oteltrace "go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/server/egressselector"
@ -50,7 +51,7 @@ type TransportConfig struct {
// function to determine the egress dialer. (i.e. konnectivity server dialer) // function to determine the egress dialer. (i.e. konnectivity server dialer)
EgressLookup egressselector.Lookup EgressLookup egressselector.Lookup
// The TracerProvider can add tracing the connection // The TracerProvider can add tracing the connection
TracerProvider *trace.TracerProvider TracerProvider oteltrace.TracerProvider
} }
// Config is configuration for creating a storage backend. // Config is configuration for creating a storage backend.
@ -122,5 +123,6 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
HealthcheckTimeout: DefaultHealthcheckTimeout, HealthcheckTimeout: DefaultHealthcheckTimeout,
ReadycheckTimeout: DefaultReadinessTimeout, ReadycheckTimeout: DefaultReadinessTimeout,
LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(), LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(),
Transport: TransportConfig{TracerProvider: oteltrace.NewNoopTracerProvider()},
} }
} }

View File

@ -45,7 +45,7 @@ import (
"k8s.io/apiserver/pkg/storage/value" "k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/traces" tracing "k8s.io/component-base/tracing"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -191,12 +191,10 @@ var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, e
} }
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) { if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
tracingOpts := []otelgrpc.Option{ tracingOpts := []otelgrpc.Option{
otelgrpc.WithPropagators(traces.Propagators()), otelgrpc.WithPropagators(tracing.Propagators()),
otelgrpc.WithTracerProvider(c.TracerProvider),
} }
if c.TracerProvider != nil { // Even with Noop TracerProvider, the otelgrpc still handles context propagation.
tracingOpts = append(tracingOpts, otelgrpc.WithTracerProvider(*c.TracerProvider))
}
// Even if there is no TracerProvider, the otelgrpc still handles context propagation.
// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough // See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
dialOptions = append(dialOptions, dialOptions = append(dialOptions,
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)), grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),

View File

@ -25,6 +25,7 @@ import (
"testing" "testing"
"go.etcd.io/etcd/client/pkg/v3/transport" "go.etcd.io/etcd/client/pkg/v3/transport"
oteltrace "go.opentelemetry.io/otel/trace"
apitesting "k8s.io/apimachinery/pkg/api/apitesting" apitesting "k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -72,10 +73,11 @@ func TestTLSConnection(t *testing.T) {
cfg := storagebackend.Config{ cfg := storagebackend.Config{
Type: storagebackend.StorageTypeETCD3, Type: storagebackend.StorageTypeETCD3,
Transport: storagebackend.TransportConfig{ Transport: storagebackend.TransportConfig{
ServerList: client.Endpoints(), ServerList: client.Endpoints(),
CertFile: certFile, CertFile: certFile,
KeyFile: keyFile, KeyFile: keyFile,
TrustedCAFile: caFile, TrustedCAFile: caFile,
TracerProvider: oteltrace.NewNoopTracerProvider(),
}, },
Codec: codec, Codec: codec,
} }

View File

@ -1,270 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tracing
import (
"fmt"
"io/ioutil"
"os"
"reflect"
"strings"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/apiserver"
)
var (
localhost = "localhost:4317"
ipAddress = "127.0.0.1:4317"
samplingRate = int32(12345)
)
func strptr(s string) *string {
return &s
}
func TestReadTracingConfiguration(t *testing.T) {
testcases := []struct {
name string
contents string
createFile bool
expectedResult *apiserver.TracingConfiguration
expectedError *string
}{
{
name: "empty",
createFile: true,
contents: ``,
expectedResult: &apiserver.TracingConfiguration{},
expectedError: nil,
},
{
name: "absent",
createFile: false,
contents: ``,
expectedResult: nil,
expectedError: strptr("unable to read tracing configuration from \"test-tracing-config-absent\": open test-tracing-config-absent: no such file or directory"),
},
{
name: "v1alpha1",
createFile: true,
contents: `
apiVersion: apiserver.config.k8s.io/v1alpha1
kind: TracingConfiguration
endpoint: localhost:4317
samplingRatePerMillion: 12345
`,
expectedResult: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Endpoint: &localhost,
SamplingRatePerMillion: &samplingRate,
},
expectedError: nil,
},
{
name: "ip address",
createFile: true,
contents: `
apiVersion: apiserver.config.k8s.io/v1alpha1
kind: TracingConfiguration
endpoint: 127.0.0.1:4317
`,
expectedResult: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Endpoint: &ipAddress,
},
expectedError: nil,
},
{
name: "wrong_type",
createFile: true,
contents: `
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: agent
spec:
selector:
matchLabels:
k8s-app: agent
template:
metadata:
labels:
k8s-app: agent
spec:
containers:
- image: registry.k8s.io/busybox
name: agent
`,
expectedResult: nil,
expectedError: strptr("unable to decode tracing configuration data: no kind \"DaemonSet\" is registered for version \"apps/v1\" in scheme"),
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
proxyConfig := fmt.Sprintf("test-tracing-config-%s", tc.name)
if tc.createFile {
f, err := ioutil.TempFile("", proxyConfig)
if err != nil {
t.Fatal(err)
}
defer os.Remove(f.Name())
if err := ioutil.WriteFile(f.Name(), []byte(tc.contents), os.FileMode(0755)); err != nil {
t.Fatal(err)
}
proxyConfig = f.Name()
}
config, err := ReadTracingConfiguration(proxyConfig)
if err == nil && tc.expectedError != nil {
t.Errorf("calling ReadTracingConfiguration expected error: %s, did not get it", *tc.expectedError)
}
if err != nil && tc.expectedError == nil {
t.Errorf("unexpected error calling ReadTracingConfiguration got: %#v", err)
}
if err != nil && tc.expectedError != nil && !strings.HasPrefix(err.Error(), *tc.expectedError) {
t.Errorf("calling ReadTracingConfiguration expected error: %s, got %#v", *tc.expectedError, err)
}
if !reflect.DeepEqual(config, tc.expectedResult) {
t.Errorf("problem with configuration returned from ReadTracingConfiguration expected: %#v, got: %#v", tc.expectedResult, config)
}
})
}
}
func TestValidateTracingConfiguration(t *testing.T) {
samplingRate := int32(12378)
negativeRate := int32(-1)
tooHighRate := int32(1000001)
validEndpoint := "localhost:4317"
dnsEndpoint := "dns://google.com:4317"
unixEndpoint := "unix://path/to/socket"
invalidURL := "dn%2s://localhost:4317"
httpEndpoint := "http://localhost:4317"
testcases := []struct {
name string
expectError bool
contents *apiserver.TracingConfiguration
}{
{
name: "sampling-rate-valid",
expectError: false,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
SamplingRatePerMillion: &samplingRate,
},
},
{
name: "sampling-rate-negative",
expectError: true,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
SamplingRatePerMillion: &negativeRate,
},
},
{
name: "sampling-rate-negative",
expectError: true,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
SamplingRatePerMillion: &tooHighRate,
},
},
{
name: "default Endpoint",
expectError: false,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Endpoint: &validEndpoint,
},
},
{
name: "dns Endpoint",
expectError: false,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Endpoint: &dnsEndpoint,
},
},
{
name: "unix Endpoint",
expectError: false,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Endpoint: &unixEndpoint,
},
},
{
name: "invalid Endpoint",
expectError: true,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Endpoint: &httpEndpoint,
},
},
{
name: "invalid url",
expectError: true,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Endpoint: &invalidURL,
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
errs := ValidateTracingConfiguration(tc.contents)
if tc.expectError == false && len(errs) != 0 {
t.Errorf("Calling ValidateTracingConfiguration expected no error, got %v", errs)
} else if tc.expectError == true && len(errs) == 0 {
t.Errorf("Calling ValidateTracingConfiguration expected error, got no error")
}
})
}
}

View File

@ -35,7 +35,7 @@ import (
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api" clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/component-base/traces" tracing "k8s.io/component-base/tracing"
) )
// AuthenticationInfoResolverWrapper can be used to inject Dial function to the // AuthenticationInfoResolverWrapper can be used to inject Dial function to the
@ -47,7 +47,7 @@ func NewDefaultAuthenticationInfoResolverWrapper(
proxyTransport *http.Transport, proxyTransport *http.Transport,
egressSelector *egressselector.EgressSelector, egressSelector *egressselector.EgressSelector,
kubeapiserverClientConfig *rest.Config, kubeapiserverClientConfig *rest.Config,
tp *trace.TracerProvider) AuthenticationInfoResolverWrapper { tp trace.TracerProvider) AuthenticationInfoResolverWrapper {
webhookAuthResolverWrapper := func(delegate AuthenticationInfoResolver) AuthenticationInfoResolver { webhookAuthResolverWrapper := func(delegate AuthenticationInfoResolver) AuthenticationInfoResolver {
return &AuthenticationInfoResolverDelegator{ return &AuthenticationInfoResolverDelegator{
@ -60,7 +60,7 @@ func NewDefaultAuthenticationInfoResolverWrapper(
return nil, err return nil, err
} }
if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) { if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) {
ret.Wrap(traces.WrapperFor(tp)) ret.Wrap(tracing.WrapperFor(tp))
} }
if egressSelector != nil { if egressSelector != nil {
@ -85,7 +85,7 @@ func NewDefaultAuthenticationInfoResolverWrapper(
return nil, err return nil, err
} }
if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) { if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) {
ret.Wrap(traces.WrapperFor(tp)) ret.Wrap(tracing.WrapperFor(tp))
} }
if egressSelector != nil { if egressSelector != nil {

View File

@ -1,81 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package traces
import (
"context"
"net/http"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"k8s.io/client-go/transport"
"k8s.io/klog/v2"
)
// NewProvider initializes tracing in the component, and enforces recommended tracing behavior.
func NewProvider(ctx context.Context, baseSampler sdktrace.Sampler, resourceOpts []resource.Option, opts ...otlpgrpc.Option) trace.TracerProvider {
opts = append(opts, otlpgrpc.WithInsecure())
driver := otlpgrpc.NewDriver(opts...)
exporter, err := otlp.NewExporter(ctx, driver)
if err != nil {
klog.Fatalf("Failed to create OTLP exporter: %v", err)
}
res, err := resource.New(ctx, resourceOpts...)
if err != nil {
klog.Fatalf("Failed to create resource: %v", err)
}
bsp := sdktrace.NewBatchSpanProcessor(exporter)
return sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.ParentBased(baseSampler)),
sdktrace.WithSpanProcessor(bsp),
sdktrace.WithResource(res),
)
}
// WrapperFor can be used to add tracing to a *rest.Config. Example usage:
//
// tp := traces.NewProvider(...)
// config, _ := rest.InClusterConfig()
// config.Wrap(traces.WrapperFor(&tp))
// kubeclient, _ := clientset.NewForConfig(config)
func WrapperFor(tp *trace.TracerProvider) transport.WrapperFunc {
return func(rt http.RoundTripper) http.RoundTripper {
opts := []otelhttp.Option{
otelhttp.WithPropagators(Propagators()),
}
if tp != nil {
opts = append(opts, otelhttp.WithTracerProvider(*tp))
}
// Even if there is no TracerProvider, the otelhttp still handles context propagation.
// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
return otelhttp.NewTransport(rt, opts...)
}
}
// Propagators returns the recommended set of propagators.
func Propagators() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
}

View File

@ -0,0 +1,8 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- sig-instrumentation-approvers
reviewers:
- sig-instrumentation-reviewers
labels:
- sig/instrumentation

View File

@ -14,63 +14,32 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package tracing package v1
import ( import (
"fmt" "fmt"
"io/ioutil"
"net/url" "net/url"
"strings" "strings"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/apis/apiserver" "k8s.io/component-base/featuregate"
"k8s.io/apiserver/pkg/apis/apiserver/install"
)
const (
maxSamplingRatePerMillion = 1000000
) )
var ( var (
cfgScheme = runtime.NewScheme() maxSamplingRatePerMillion = int32(1000000)
codecs = serializer.NewCodecFactory(cfgScheme)
) )
func init() {
install.Install(cfgScheme)
}
// ReadTracingConfiguration reads the tracing configuration from a file
func ReadTracingConfiguration(configFilePath string) (*apiserver.TracingConfiguration, error) {
if configFilePath == "" {
return nil, fmt.Errorf("tracing config file was empty")
}
data, err := ioutil.ReadFile(configFilePath)
if err != nil {
return nil, fmt.Errorf("unable to read tracing configuration from %q: %v", configFilePath, err)
}
internalConfig := &apiserver.TracingConfiguration{}
// this handles json/yaml/whatever, and decodes all registered version to the internal version
if err := runtime.DecodeInto(codecs.UniversalDecoder(), data, internalConfig); err != nil {
return nil, fmt.Errorf("unable to decode tracing configuration data: %v", err)
}
return internalConfig, nil
}
// ValidateTracingConfiguration validates the tracing configuration // ValidateTracingConfiguration validates the tracing configuration
func ValidateTracingConfiguration(config *apiserver.TracingConfiguration) field.ErrorList { func ValidateTracingConfiguration(traceConfig *TracingConfiguration, featureGate featuregate.FeatureGate, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{} allErrs := field.ErrorList{}
if config == nil { if traceConfig == nil {
// Tracing is disabled
return allErrs return allErrs
} }
if config.SamplingRatePerMillion != nil { if traceConfig.SamplingRatePerMillion != nil {
allErrs = append(allErrs, validateSamplingRate(*config.SamplingRatePerMillion, field.NewPath("samplingRatePerMillion"))...) allErrs = append(allErrs, validateSamplingRate(*traceConfig.SamplingRatePerMillion, fldPath.Child("samplingRatePerMillion"))...)
} }
if config.Endpoint != nil { if traceConfig.Endpoint != nil {
allErrs = append(allErrs, validateEndpoint(*config.Endpoint, field.NewPath("endpoint"))...) allErrs = append(allErrs, validateEndpoint(*traceConfig.Endpoint, fldPath.Child("endpoint"))...)
} }
return allErrs return allErrs
} }

View File

@ -0,0 +1,107 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
"testing"
"k8s.io/apimachinery/pkg/util/validation/field"
)
func TestValidateTracingConfiguration(t *testing.T) {
samplingRate := int32(12378)
negativeRate := int32(-1)
tooHighRate := int32(1000001)
validEndpoint := "localhost:4317"
dnsEndpoint := "dns://google.com:4317"
unixEndpoint := "unix://path/to/socket"
invalidURL := "dn%2s://localhost:4317"
httpEndpoint := "http://localhost:4317"
testcases := []struct {
name string
expectError bool
contents *TracingConfiguration
}{
{
name: "sampling-rate-valid",
expectError: false,
contents: &TracingConfiguration{
SamplingRatePerMillion: &samplingRate,
},
},
{
name: "sampling-rate-negative",
expectError: true,
contents: &TracingConfiguration{
SamplingRatePerMillion: &negativeRate,
},
},
{
name: "sampling-rate-negative",
expectError: true,
contents: &TracingConfiguration{
SamplingRatePerMillion: &tooHighRate,
},
},
{
name: "default Endpoint",
expectError: false,
contents: &TracingConfiguration{
Endpoint: &validEndpoint,
},
},
{
name: "dns Endpoint",
expectError: false,
contents: &TracingConfiguration{
Endpoint: &dnsEndpoint,
},
},
{
name: "unix Endpoint",
expectError: false,
contents: &TracingConfiguration{
Endpoint: &unixEndpoint,
},
},
{
name: "invalid Endpoint",
expectError: true,
contents: &TracingConfiguration{
Endpoint: &httpEndpoint,
},
},
{
name: "invalid url",
expectError: true,
contents: &TracingConfiguration{
Endpoint: &invalidURL,
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
errs := ValidateTracingConfiguration(tc.contents, nil, field.NewPath("tracing"))
if tc.expectError == false && len(errs) != 0 {
t.Errorf("Calling ValidateTracingConfiguration expected no error, got %v", errs)
} else if tc.expectError == true && len(errs) == 0 {
t.Errorf("Calling ValidateTracingConfiguration expected error, got no error")
}
})
}
}

View File

@ -0,0 +1,29 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// +k8s:deepcopy-gen=package
// Package v1 contains the configuration API for tracing.
//
// The intention is to only have a single version of this API, potentially with
// new fields added over time in a backwards-compatible manner. Fields for
// alpha or beta features are allowed as long as they are defined so that not
// changing the defaults leaves those features disabled.
//
// The "v1" package name is just a reminder that API compatibility rules apply,
// not an indication of the stability of all features covered by it.
package v1 // import "k8s.io/component-base/tracing/api/v1"

View File

@ -0,0 +1,32 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
// TracingConfiguration provides versioned configuration for OpenTelemetry tracing clients.
type TracingConfiguration struct {
// Endpoint of the collector this component will report traces to.
// The connection is insecure, and does not currently support TLS.
// Recommended is unset, and endpoint is the otlp grpc default, localhost:4317.
// +optional
Endpoint *string `json:"endpoint,omitempty"`
// SamplingRatePerMillion is the number of samples to collect per million spans.
// Recommended is unset. If unset, sampler respects its parent span's sampling
// rate, but otherwise never samples.
// +optional
SamplingRatePerMillion *int32 `json:"samplingRatePerMillion,omitempty"`
}

View File

@ -0,0 +1,108 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tracing
import (
"context"
"net/http"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
oteltrace "go.opentelemetry.io/otel/trace"
"k8s.io/client-go/transport"
"k8s.io/component-base/tracing/api/v1"
)
// NewProvider creates a TracerProvider in a component, and enforces recommended tracing behavior
func NewProvider(ctx context.Context,
tracingConfig *v1.TracingConfiguration,
addedOpts []otlpgrpc.Option,
resourceOpts []resource.Option,
) (oteltrace.TracerProvider, error) {
if tracingConfig == nil {
return oteltrace.NewNoopTracerProvider(), nil
}
opts := append([]otlpgrpc.Option{}, addedOpts...)
if tracingConfig.Endpoint != nil {
opts = append(opts, otlpgrpc.WithEndpoint(*tracingConfig.Endpoint))
}
opts = append(opts, otlpgrpc.WithInsecure())
driver := otlpgrpc.NewDriver(opts...)
exporter, err := otlp.NewExporter(ctx, driver)
if err != nil {
return nil, err
}
res, err := resource.New(ctx, resourceOpts...)
if err != nil {
return nil, err
}
// sampler respects parent span's sampling rate or
// otherwise never samples.
sampler := sdktrace.NeverSample()
// Or, emit spans for a fraction of transactions
if tracingConfig.SamplingRatePerMillion != nil && *tracingConfig.SamplingRatePerMillion > 0 {
sampler = sdktrace.TraceIDRatioBased(float64(*tracingConfig.SamplingRatePerMillion) / float64(1000000))
}
// batch span processor to aggregate spans before export.
bsp := sdktrace.NewBatchSpanProcessor(exporter)
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.ParentBased(sampler)),
sdktrace.WithSpanProcessor(bsp),
sdktrace.WithResource(res),
)
return tp, nil
}
// WithTracing adds tracing to requests if the incoming request is sampled
func WithTracing(handler http.Handler, tp oteltrace.TracerProvider, serviceName string) http.Handler {
opts := []otelhttp.Option{
otelhttp.WithPropagators(Propagators()),
otelhttp.WithTracerProvider(tp),
}
// With Noop TracerProvider, the otelhttp still handles context propagation.
// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
return otelhttp.NewHandler(handler, serviceName, opts...)
}
// WrapperFor can be used to add tracing to a *rest.Config.
// Example usage:
// tp := NewProvider(...)
// config, _ := rest.InClusterConfig()
// config.Wrap(WrapperFor(&tp))
// kubeclient, _ := clientset.NewForConfig(config)
func WrapperFor(tp oteltrace.TracerProvider) transport.WrapperFunc {
return func(rt http.RoundTripper) http.RoundTripper {
opts := []otelhttp.Option{
otelhttp.WithPropagators(Propagators()),
otelhttp.WithTracerProvider(tp),
}
// With Noop TracerProvider, the otelhttp still handles context propagation.
// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
return otelhttp.NewTransport(rt, opts...)
}
}
// Propagators returns the recommended set of propagators.
func Propagators() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
}

View File

@ -20,6 +20,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
logsapi "k8s.io/component-base/logs/api/v1" logsapi "k8s.io/component-base/logs/api/v1"
tracingapi "k8s.io/component-base/tracing/api/v1"
) )
// HairpinMode denotes how the kubelet should configure networking to handle // HairpinMode denotes how the kubelet should configure networking to handle
@ -780,6 +781,11 @@ type KubeletConfiguration struct {
// Default: true // Default: true
// +optional // +optional
RegisterNode *bool `json:"registerNode,omitempty"` RegisterNode *bool `json:"registerNode,omitempty"`
// Tracing specifies the versioned configuration for OpenTelemetry tracing clients.
// See http://kep.k8s.io/2832 for more details.
// +featureGate=KubeletTracing
// +optional
Tracing *tracingapi.TracingConfiguration `json:"tracing,omitempty"`
} }
type KubeletAuthorizationMode string type KubeletAuthorizationMode string

View File

@ -31,6 +31,8 @@ import (
"strings" "strings"
"time" "time"
oteltrace "go.opentelemetry.io/otel/trace"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -298,7 +300,7 @@ func getCRIClient() (internalapi.RuntimeService, internalapi.ImageManagerService
// connection timeout for CRI service connection // connection timeout for CRI service connection
const connectionTimeout = 2 * time.Minute const connectionTimeout = 2 * time.Minute
runtimeEndpoint := framework.TestContext.ContainerRuntimeEndpoint runtimeEndpoint := framework.TestContext.ContainerRuntimeEndpoint
r, err := remote.NewRemoteRuntimeService(runtimeEndpoint, connectionTimeout) r, err := remote.NewRemoteRuntimeService(runtimeEndpoint, connectionTimeout, oteltrace.NewNoopTracerProvider())
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -34,7 +34,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
client "k8s.io/client-go/kubernetes" client "k8s.io/client-go/kubernetes"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/traces" "k8s.io/component-base/tracing"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
) )
@ -84,7 +84,7 @@ endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
// Create a client that creates sampled traces. // Create a client that creates sampled traces.
tp := trace.TracerProvider(sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))) tp := trace.TracerProvider(sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample())))
clientConfig.Wrap(traces.WrapperFor(&tp)) clientConfig.Wrap(tracing.WrapperFor(tp))
clientSet, err := client.NewForConfig(clientConfig) clientSet, err := client.NewForConfig(clientConfig)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)