diff --git a/cmd/kubemark/hollow-node.go b/cmd/kubemark/hollow-node.go index 56ea44eef64..b3d347e35fd 100644 --- a/cmd/kubemark/hollow-node.go +++ b/cmd/kubemark/hollow-node.go @@ -254,7 +254,7 @@ func run(cmd *cobra.Command, config *hollowNodeConfig) error { var imageService internalapi.ImageManagerService = fakeRemoteRuntime.ImageService if config.UseHostImageService { - imageService, err = remote.NewRemoteImageService(f.RemoteImageEndpoint, 15*time.Second) + imageService, err = remote.NewRemoteImageService(f.RemoteImageEndpoint, 15*time.Second, oteltrace.NewNoopTracerProvider()) if err != nil { return fmt.Errorf("Failed to init image service, error: %w", err) } diff --git a/pkg/kubelet/cri/remote/remote_image.go b/pkg/kubelet/cri/remote/remote_image.go index 73236896d64..299f4644430 100644 --- a/pkg/kubelet/cri/remote/remote_image.go +++ b/pkg/kubelet/cri/remote/remote_image.go @@ -22,15 +22,20 @@ import ( "fmt" "time" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" + utilfeature "k8s.io/apiserver/pkg/util/feature" + tracing "k8s.io/component-base/tracing" "k8s.io/klog/v2" internalapi "k8s.io/cri-api/pkg/apis" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" runtimeapiV1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/util" ) @@ -42,7 +47,7 @@ type remoteImageService struct { } // NewRemoteImageService creates a new internalapi.ImageManagerService. -func NewRemoteImageService(endpoint string, connectionTimeout time.Duration) (internalapi.ImageManagerService, error) { +func NewRemoteImageService(endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.ImageManagerService, error) { klog.V(3).InfoS("Connecting to image service", "endpoint", endpoint) addr, dialer, err := util.GetAddressAndDialer(endpoint) if err != nil { @@ -52,10 +57,24 @@ func NewRemoteImageService(endpoint string, connectionTimeout time.Duration) (in ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) defer cancel() - conn, err := grpc.DialContext(ctx, addr, + dialOpts := []grpc.DialOption{} + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) + if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) { + tracingOpts := []otelgrpc.Option{ + otelgrpc.WithPropagators(tracing.Propagators()), + otelgrpc.WithTracerProvider(tp), + } + // Even if there is no TracerProvider, the otelgrpc still handles context propagation. + // See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough + dialOpts = append(dialOpts, + grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)), + grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...))) + } + + conn, err := grpc.DialContext(ctx, addr, dialOpts...) if err != nil { klog.ErrorS(err, "Connect remote image service failed", "address", addr) return nil, err diff --git a/pkg/kubelet/cri/remote/remote_image_test.go b/pkg/kubelet/cri/remote/remote_image_test.go new file mode 100644 index 00000000000..1944687e23e --- /dev/null +++ b/pkg/kubelet/cri/remote/remote_image_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + oteltrace "go.opentelemetry.io/otel/trace" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + internalapi "k8s.io/cri-api/pkg/apis" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/util" +) + +func createRemoteImageServiceWithTracerProvider(endpoint string, tp oteltrace.TracerProvider, t *testing.T) internalapi.ImageManagerService { + runtimeService, err := NewRemoteImageService(endpoint, defaultConnectionTimeout, tp) + require.NoError(t, err) + + return runtimeService +} + +func createRemoteImageServiceWithoutTracerProvider(endpoint string, t *testing.T) internalapi.ImageManagerService { + runtimeService, err := NewRemoteImageService(endpoint, defaultConnectionTimeout, oteltrace.NewNoopTracerProvider()) + require.NoError(t, err) + + return runtimeService +} + +func TestImageServiceSpansWithTP(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KubeletTracing, true)() + fakeRuntime, endpoint := createAndStartFakeRemoteRuntime(t) + defer func() { + fakeRuntime.Stop() + // clear endpoint file + if addr, _, err := util.GetAddressAndDialer(endpoint); err == nil { + if _, err := os.Stat(addr); err == nil { + os.Remove(addr) + } + } + }() + exp := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + ) + ctx := context.Background() + imgSvc := createRemoteImageServiceWithTracerProvider(endpoint, tp, t) + imgRef, err := imgSvc.PullImage(&runtimeapi.ImageSpec{Image: "busybox"}, nil, nil) + assert.NoError(t, err) + assert.Equal(t, "busybox", imgRef) + require.NoError(t, err) + err = tp.ForceFlush(ctx) + require.NoError(t, err) + assert.NotEmpty(t, exp.GetSpans()) +} + +func TestImageServiceSpansWithoutTP(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KubeletTracing, true)() + fakeRuntime, endpoint := createAndStartFakeRemoteRuntime(t) + defer func() { + fakeRuntime.Stop() + // clear endpoint file + if addr, _, err := util.GetAddressAndDialer(endpoint); err == nil { + if _, err := os.Stat(addr); err == nil { + os.Remove(addr) + } + } + }() + exp := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + ) + ctx := context.Background() + imgSvc := createRemoteImageServiceWithoutTracerProvider(endpoint, t) + imgRef, err := imgSvc.PullImage(&runtimeapi.ImageSpec{Image: "busybox"}, nil, nil) + assert.NoError(t, err) + assert.Equal(t, "busybox", imgRef) + require.NoError(t, err) + err = tp.ForceFlush(ctx) + require.NoError(t, err) + assert.Empty(t, exp.GetSpans()) +} diff --git a/pkg/kubelet/cri/remote/remote_runtime_test.go b/pkg/kubelet/cri/remote/remote_runtime_test.go index cfe4aadbfd9..5219c1ca8b1 100644 --- a/pkg/kubelet/cri/remote/remote_runtime_test.go +++ b/pkg/kubelet/cri/remote/remote_runtime_test.go @@ -31,7 +31,6 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" internalapi "k8s.io/cri-api/pkg/apis" - //kubeapi "k8s.io/cri-api/pkg/apis/runtime/v1" apitest "k8s.io/cri-api/pkg/apis/testing" "k8s.io/kubernetes/pkg/features" fakeremote "k8s.io/kubernetes/pkg/kubelet/cri/remote/fake" diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0136d69950c..1582e5c2431 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -298,7 +298,7 @@ func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration, if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil { return err } - if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil { + if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil { return err } diff --git a/test/e2e_node/util.go b/test/e2e_node/util.go index 5200815c753..e9735eaace8 100644 --- a/test/e2e_node/util.go +++ b/test/e2e_node/util.go @@ -310,7 +310,7 @@ func getCRIClient() (internalapi.RuntimeService, internalapi.ImageManagerService //explicitly specified imageManagerEndpoint = framework.TestContext.ImageServiceEndpoint } - i, err := remote.NewRemoteImageService(imageManagerEndpoint, connectionTimeout) + i, err := remote.NewRemoteImageService(imageManagerEndpoint, connectionTimeout, oteltrace.NewNoopTracerProvider()) if err != nil { return nil, nil, err }