diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d2483bd1187..e2844472499 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1945,13 +1945,14 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType } } - // TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker. - // Currently, using that context causes test failures. To remove this todoCtx, any wait.Interrupted - // errors need to be filtered from result and bypass the reasonCache - cancelling the context for - // SyncPod is a known and deliberate error, not a generic error. - todoCtx := context.TODO() + // TODO(#113606): use cancellation from the incoming context parameter, which comes from the pod worker. + // Currently, using cancellation from that context causes test failures. To remove this WithoutCancel, + // any wait.Interrupted errors need to be filtered from result and bypass the reasonCache - cancelling + // the context for SyncPod is a known and deliberate error, not a generic error. + // Use WithoutCancel instead of a new context.TODO() to propagate trace context // Call the container runtime's SyncPod callback - result := kl.containerRuntime.SyncPod(todoCtx, pod, podStatus, pullSecrets, kl.backOff) + sctx := context.WithoutCancel(ctx) + result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.backOff) kl.reasonCache.Update(pod.UID, result) if err := result.Error(); err != nil { // Do not return error if the only failures were pods in backoff diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index dc3944d7459..b89090797ad 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -31,6 +31,8 @@ import ( "testing" "time" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" oteltrace "go.opentelemetry.io/otel/trace" "github.com/golang/mock/gomock" @@ -69,6 +71,7 @@ import ( fakeremote "k8s.io/kubernetes/pkg/kubelet/cri/remote/fake" "k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/images" + "k8s.io/kubernetes/pkg/kubelet/kuberuntime" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/logs" "k8s.io/kubernetes/pkg/kubelet/network/dns" @@ -3035,8 +3038,8 @@ func createAndStartFakeRemoteRuntime(t *testing.T) (*fakeremote.RemoteRuntime, s return fakeRuntime, endpoint } -func createRemoteRuntimeService(endpoint string, t *testing.T) internalapi.RuntimeService { - runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second, oteltrace.NewNoopTracerProvider()) +func createRemoteRuntimeService(endpoint string, t *testing.T, tp oteltrace.TracerProvider) internalapi.RuntimeService { + runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second, tp) require.NoError(t, err) return runtimeService } @@ -3074,7 +3077,7 @@ func TestNewMainKubeletStandAlone(t *testing.T) { fakeRuntime.Stop() }() fakeRecorder := &record.FakeRecorder{} - rtSvc := createRemoteRuntimeService(endpoint, t) + rtSvc := createRemoteRuntimeService(endpoint, t, oteltrace.NewNoopTracerProvider()) kubeDep := &Dependencies{ Auth: nil, CAdvisorInterface: cadvisor, @@ -3157,3 +3160,128 @@ func TestNewMainKubeletStandAlone(t *testing.T) { assert.Nil(t, testMainKubelet.configMapManager, "configmap manager should be nil if kubelet is in standalone mode") assert.Nil(t, testMainKubelet.secretManager, "secret manager should be nil if kubelet is in standalone mode") } + +func TestSyncPodSpans(t *testing.T) { + testKubelet := newTestKubelet(t, false) + kubelet := testKubelet.kubelet + + recorder := record.NewFakeRecorder(20) + nodeRef := &v1.ObjectReference{ + Kind: "Node", + Name: "testNode", + UID: types.UID("testNode"), + Namespace: "", + } + kubelet.dnsConfigurer = dns.NewConfigurer(recorder, nodeRef, nil, nil, "TEST", "") + + kubeCfg := &kubeletconfiginternal.KubeletConfiguration{ + SyncFrequency: metav1.Duration{Duration: time.Minute}, + ConfigMapAndSecretChangeDetectionStrategy: kubeletconfiginternal.WatchChangeDetectionStrategy, + ContainerLogMaxSize: "10Mi", + ContainerLogMaxFiles: 5, + MemoryThrottlingFactor: utilpointer.Float64(0), + } + + exp := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithSyncer(exp), + ) + kubelet.tracer = tp.Tracer(instrumentationScope) + + fakeRuntime, endpoint := createAndStartFakeRemoteRuntime(t) + defer func() { + fakeRuntime.Stop() + }() + runtimeSvc := createRemoteRuntimeService(endpoint, t, tp) + kubelet.runtimeService = runtimeSvc + + fakeRuntime.ImageService.SetFakeImageSize(100) + fakeRuntime.ImageService.SetFakeImages([]string{"test:latest"}) + imageSvc, err := remote.NewRemoteImageService(endpoint, 15*time.Second, tp) + assert.NoError(t, err) + + kubelet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager( + kubelet.recorder, + kubelet.livenessManager, + kubelet.readinessManager, + kubelet.startupManager, + kubelet.rootDirectory, + kubelet.machineInfo, + kubelet.podWorkers, + kubelet.os, + kubelet, + nil, + kubelet.backOff, + kubeCfg.SerializeImagePulls, + kubeCfg.MaxParallelImagePulls, + float32(kubeCfg.RegistryPullQPS), + int(kubeCfg.RegistryBurst), + "", + "", + kubeCfg.CPUCFSQuota, + kubeCfg.CPUCFSQuotaPeriod, + runtimeSvc, + imageSvc, + kubelet.containerManager, + kubelet.containerLogManager, + kubelet.runtimeClassManager, + false, + kubeCfg.MemorySwap.SwapBehavior, + kubelet.containerManager.GetNodeAllocatableAbsolute, + *kubeCfg.MemoryThrottlingFactor, + kubeletutil.NewPodStartupLatencyTracker(), + tp, + ) + assert.NoError(t, err) + + pod := podWithUIDNameNsSpec("12345678", "foo", "new", v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "bar", + Image: "test:latest", + ImagePullPolicy: v1.PullAlways, + }, + }, + EnableServiceLinks: utilpointer.Bool(false), + }) + + _, err = kubelet.SyncPod(context.Background(), kubetypes.SyncPodCreate, pod, nil, &kubecontainer.PodStatus{}) + require.NoError(t, err) + + require.NoError(t, err) + assert.NotEmpty(t, exp.GetSpans()) + + // find root span for syncPod + var rootSpan *tracetest.SpanStub + spans := exp.GetSpans() + for i, span := range spans { + if span.Name == "syncPod" { + rootSpan = &spans[i] + break + } + } + assert.NotNil(t, rootSpan) + + imageServiceSpans := make([]tracetest.SpanStub, 0) + runtimeServiceSpans := make([]tracetest.SpanStub, 0) + for _, span := range exp.GetSpans() { + if span.SpanContext.TraceID() == rootSpan.SpanContext.TraceID() { + switch { + case strings.HasPrefix(span.Name, "runtime.v1.ImageService"): + imageServiceSpans = append(imageServiceSpans, span) + case strings.HasPrefix(span.Name, "runtime.v1.RuntimeService"): + runtimeServiceSpans = append(runtimeServiceSpans, span) + } + } + } + assert.NotEmpty(t, imageServiceSpans, "syncPod trace should have image service spans") + assert.NotEmpty(t, runtimeServiceSpans, "syncPod trace should have runtime service spans") + + for _, span := range imageServiceSpans { + assert.Equal(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), fmt.Sprintf("image service span %s %s should be child of root span", span.Name, span.Parent.SpanID())) + } + + for _, span := range runtimeServiceSpans { + assert.Equal(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), fmt.Sprintf("runtime service span %s %s should be child of root span", span.Name, span.Parent.SpanID())) + } +}