Support otel tracing in cri remote image service

Signed-off-by: Swagat Bora <sbora@amazon.com>
This commit is contained in:
Swagat Bora 2022-09-23 00:33:18 +00:00
parent ed8c302cc6
commit caa83c25ae
6 changed files with 127 additions and 6 deletions

View File

@ -254,7 +254,7 @@ func run(cmd *cobra.Command, config *hollowNodeConfig) error {
var imageService internalapi.ImageManagerService = fakeRemoteRuntime.ImageService
if config.UseHostImageService {
imageService, err = remote.NewRemoteImageService(f.RemoteImageEndpoint, 15*time.Second)
imageService, err = remote.NewRemoteImageService(f.RemoteImageEndpoint, 15*time.Second, oteltrace.NewNoopTracerProvider())
if err != nil {
return fmt.Errorf("Failed to init image service, error: %w", err)
}

View File

@ -22,15 +22,20 @@ import (
"fmt"
"time"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
utilfeature "k8s.io/apiserver/pkg/util/feature"
tracing "k8s.io/component-base/tracing"
"k8s.io/klog/v2"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
runtimeapiV1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/util"
)
@ -42,7 +47,7 @@ type remoteImageService struct {
}
// NewRemoteImageService creates a new internalapi.ImageManagerService.
func NewRemoteImageService(endpoint string, connectionTimeout time.Duration) (internalapi.ImageManagerService, error) {
func NewRemoteImageService(endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.ImageManagerService, error) {
klog.V(3).InfoS("Connecting to image service", "endpoint", endpoint)
addr, dialer, err := util.GetAddressAndDialer(endpoint)
if err != nil {
@ -52,10 +57,24 @@ func NewRemoteImageService(endpoint string, connectionTimeout time.Duration) (in
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr,
dialOpts := []grpc.DialOption{}
dialOpts = append(dialOpts,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialer),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
tracingOpts := []otelgrpc.Option{
otelgrpc.WithPropagators(tracing.Propagators()),
otelgrpc.WithTracerProvider(tp),
}
// Even if there is no TracerProvider, the otelgrpc still handles context propagation.
// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
dialOpts = append(dialOpts,
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...)))
}
conn, err := grpc.DialContext(ctx, addr, dialOpts...)
if err != nil {
klog.ErrorS(err, "Connect remote image service failed", "address", addr)
return nil, err

View File

@ -0,0 +1,103 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remote
import (
"context"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
oteltrace "go.opentelemetry.io/otel/trace"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/util"
)
func createRemoteImageServiceWithTracerProvider(endpoint string, tp oteltrace.TracerProvider, t *testing.T) internalapi.ImageManagerService {
runtimeService, err := NewRemoteImageService(endpoint, defaultConnectionTimeout, tp)
require.NoError(t, err)
return runtimeService
}
func createRemoteImageServiceWithoutTracerProvider(endpoint string, t *testing.T) internalapi.ImageManagerService {
runtimeService, err := NewRemoteImageService(endpoint, defaultConnectionTimeout, oteltrace.NewNoopTracerProvider())
require.NoError(t, err)
return runtimeService
}
func TestImageServiceSpansWithTP(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KubeletTracing, true)()
fakeRuntime, endpoint := createAndStartFakeRemoteRuntime(t)
defer func() {
fakeRuntime.Stop()
// clear endpoint file
if addr, _, err := util.GetAddressAndDialer(endpoint); err == nil {
if _, err := os.Stat(addr); err == nil {
os.Remove(addr)
}
}
}()
exp := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
)
ctx := context.Background()
imgSvc := createRemoteImageServiceWithTracerProvider(endpoint, tp, t)
imgRef, err := imgSvc.PullImage(&runtimeapi.ImageSpec{Image: "busybox"}, nil, nil)
assert.NoError(t, err)
assert.Equal(t, "busybox", imgRef)
require.NoError(t, err)
err = tp.ForceFlush(ctx)
require.NoError(t, err)
assert.NotEmpty(t, exp.GetSpans())
}
func TestImageServiceSpansWithoutTP(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KubeletTracing, true)()
fakeRuntime, endpoint := createAndStartFakeRemoteRuntime(t)
defer func() {
fakeRuntime.Stop()
// clear endpoint file
if addr, _, err := util.GetAddressAndDialer(endpoint); err == nil {
if _, err := os.Stat(addr); err == nil {
os.Remove(addr)
}
}
}()
exp := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
)
ctx := context.Background()
imgSvc := createRemoteImageServiceWithoutTracerProvider(endpoint, t)
imgRef, err := imgSvc.PullImage(&runtimeapi.ImageSpec{Image: "busybox"}, nil, nil)
assert.NoError(t, err)
assert.Equal(t, "busybox", imgRef)
require.NoError(t, err)
err = tp.ForceFlush(ctx)
require.NoError(t, err)
assert.Empty(t, exp.GetSpans())
}

View File

@ -31,7 +31,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
internalapi "k8s.io/cri-api/pkg/apis"
//kubeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
apitest "k8s.io/cri-api/pkg/apis/testing"
"k8s.io/kubernetes/pkg/features"
fakeremote "k8s.io/kubernetes/pkg/kubelet/cri/remote/fake"

View File

@ -298,7 +298,7 @@ func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
return err
}
if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil {
if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
return err
}

View File

@ -310,7 +310,7 @@ func getCRIClient() (internalapi.RuntimeService, internalapi.ImageManagerService
//explicitly specified
imageManagerEndpoint = framework.TestContext.ImageServiceEndpoint
}
i, err := remote.NewRemoteImageService(imageManagerEndpoint, connectionTimeout)
i, err := remote.NewRemoteImageService(imageManagerEndpoint, connectionTimeout, oteltrace.NewNoopTracerProvider())
if err != nil {
return nil, nil, err
}