From 90fc108f6da37b04d9c63051f752cc6f232810a4 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Tue, 28 Nov 2023 20:02:23 +0800 Subject: [PATCH 1/7] fix kubelet trace broke in 1.28 Signed-off-by: Ziqi Zhao --- pkg/kubelet/kubelet.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 62083b62db4..0ea8b69805e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1960,9 +1960,11 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType // Currently, using that context causes test failures. To remove this todoCtx, any wait.Interrupted // errors need to be filtered from result and bypass the reasonCache - cancelling the context for // SyncPod is a known and deliberate error, not a generic error. - todoCtx := context.TODO() // Call the container runtime's SyncPod callback - result := kl.containerRuntime.SyncPod(todoCtx, pod, podStatus, pullSecrets, kl.backOff) + result := kl.containerRuntime.SyncPod(ctx, pod, podStatus, pullSecrets, kl.backOff) + if wait.Interrupted(result.Error()) { + return false, err + } kl.reasonCache.Update(pod.UID, result) if err := result.Error(); err != nil { // Do not return error if the only failures were pods in backoff From 6d92b330e31fc580c73c31f1f82eacdb4c9bd2bd Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Tue, 28 Nov 2023 23:43:33 +0800 Subject: [PATCH 2/7] use ctx withoutcancel Signed-off-by: Ziqi Zhao --- pkg/kubelet/kubelet.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0ea8b69805e..2c1b99e7624 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1961,7 +1961,8 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType // errors need to be filtered from result and bypass the reasonCache - cancelling the context for // SyncPod is a known and deliberate error, not a generic error. // Call the container runtime's SyncPod callback - result := kl.containerRuntime.SyncPod(ctx, pod, podStatus, pullSecrets, kl.backOff) + sctx := context.WithoutCancel(ctx) + result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.backOff) if wait.Interrupted(result.Error()) { return false, err } From f035fea63ab4c43ceb608f566752ef4bba94cabd Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Wed, 29 Nov 2023 09:59:53 +0800 Subject: [PATCH 3/7] remove unnecessary code Signed-off-by: Ziqi Zhao --- pkg/kubelet/kubelet.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 2c1b99e7624..4c82febb311 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1963,9 +1963,6 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType // Call the container runtime's SyncPod callback sctx := context.WithoutCancel(ctx) result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.backOff) - if wait.Interrupted(result.Error()) { - return false, err - } kl.reasonCache.Update(pod.UID, result) if err := result.Error(); err != nil { // Do not return error if the only failures were pods in backoff From 24c4d6f7c9ca034d0b6234547b44ac006d325099 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Sat, 2 Dec 2023 17:09:01 +0800 Subject: [PATCH 4/7] add unit test for kubelet trace Signed-off-by: Ziqi Zhao --- pkg/kubelet/kubelet_test.go | 126 +++++++++++++++++++++++++++++++++++- 1 file changed, 123 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 09de07fa501..108f3b5d81a 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -31,6 +31,9 @@ import ( "testing" "time" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" oteltrace "go.opentelemetry.io/otel/trace" "github.com/golang/mock/gomock" @@ -69,6 +72,7 @@ import ( fakeremote "k8s.io/kubernetes/pkg/kubelet/cri/remote/fake" "k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/images" + "k8s.io/kubernetes/pkg/kubelet/kuberuntime" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/logs" "k8s.io/kubernetes/pkg/kubelet/network/dns" @@ -3100,8 +3104,8 @@ func createAndStartFakeRemoteRuntime(t *testing.T) (*fakeremote.RemoteRuntime, s return fakeRuntime, endpoint } -func createRemoteRuntimeService(endpoint string, t *testing.T) internalapi.RuntimeService { - runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second, oteltrace.NewNoopTracerProvider()) +func createRemoteRuntimeService(endpoint string, t *testing.T, tp trace.TracerProvider) internalapi.RuntimeService { + runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second, tp) require.NoError(t, err) return runtimeService } @@ -3139,7 +3143,7 @@ func TestNewMainKubeletStandAlone(t *testing.T) { fakeRuntime.Stop() }() fakeRecorder := &record.FakeRecorder{} - rtSvc := createRemoteRuntimeService(endpoint, t) + rtSvc := createRemoteRuntimeService(endpoint, t, oteltrace.NewNoopTracerProvider()) kubeDep := &Dependencies{ Auth: nil, CAdvisorInterface: cadvisor, @@ -3222,3 +3226,119 @@ func TestNewMainKubeletStandAlone(t *testing.T) { assert.Nil(t, testMainKubelet.configMapManager, "configmap manager should be nil if kubelet is in standalone mode") assert.Nil(t, testMainKubelet.secretManager, "secret manager should be nil if kubelet is in standalone mode") } + +func TestSyncPodSpans(t *testing.T) { + testKubelet := newTestKubelet(t, false) + kubelet := testKubelet.kubelet + + recorder := record.NewFakeRecorder(20) + nodeRef := &v1.ObjectReference{ + Kind: "Node", + Name: "testNode", + UID: types.UID("testNode"), + Namespace: "", + } + kubelet.dnsConfigurer = dns.NewConfigurer(recorder, nodeRef, nil, nil, "TEST", "") + + kubeCfg := &kubeletconfiginternal.KubeletConfiguration{ + SyncFrequency: metav1.Duration{Duration: time.Minute}, + ConfigMapAndSecretChangeDetectionStrategy: kubeletconfiginternal.WatchChangeDetectionStrategy, + ContainerLogMaxSize: "10Mi", + ContainerLogMaxFiles: 5, + MemoryThrottlingFactor: utilpointer.Float64(0), + } + + exp := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + ) + kubelet.tracer = tp.Tracer(instrumentationScope) + + fakeRuntime, endpoint := createAndStartFakeRemoteRuntime(t) + defer func() { + fakeRuntime.Stop() + }() + runtimeSvc := createRemoteRuntimeService(endpoint, t, tp) + kubelet.runtimeService = runtimeSvc + + fakeRuntime.ImageService.SetFakeImageSize(100) + fakeRuntime.ImageService.SetFakeImages([]string{"test:latest"}) + imageSvc, err := remote.NewRemoteImageService(endpoint, 15*time.Second, tp) + + kubelet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager( + kubelet.recorder, + kubelet.livenessManager, + kubelet.readinessManager, + kubelet.startupManager, + kubelet.rootDirectory, + kubelet.machineInfo, + kubelet.podWorkers, + kubelet.os, + kubelet, + nil, + kubelet.backOff, + kubeCfg.SerializeImagePulls, + kubeCfg.MaxParallelImagePulls, + float32(kubeCfg.RegistryPullQPS), + int(kubeCfg.RegistryBurst), + "", + "", + kubeCfg.CPUCFSQuota, + kubeCfg.CPUCFSQuotaPeriod, + runtimeSvc, + imageSvc, + kubelet.containerManager, + kubelet.containerLogManager, + kubelet.runtimeClassManager, + false, + kubeCfg.MemorySwap.SwapBehavior, + kubelet.containerManager.GetNodeAllocatableAbsolute, + *kubeCfg.MemoryThrottlingFactor, + kubeletutil.NewPodStartupLatencyTracker(), + tp, + ) + + pod := podWithUIDNameNsSpec("12345678", "foo", "new", v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "bar", + Image: "test:latest", + ImagePullPolicy: v1.PullAlways, + }, + }, + EnableServiceLinks: utilpointer.Bool(false), + }) + + _, err = kubelet.SyncPod(context.Background(), kubetypes.SyncPodCreate, pod, nil, &kubecontainer.PodStatus{}) + require.NoError(t, err) + + err = tp.ForceFlush(context.Background()) + require.NoError(t, err) + assert.NotEmpty(t, exp.GetSpans()) + + // find root span for syncPod + var rootSpan *tracetest.SpanStub + spans := exp.GetSpans() + for i, span := range spans { + if span.Name == "syncPod" { + rootSpan = &spans[i] + break + } + } + assert.NotNil(t, rootSpan) + + imageServiceSpans := make([]tracetest.SpanStub, 0) + runtimeServiceSpans := make([]tracetest.SpanStub, 0) + for _, span := range exp.GetSpans() { + if span.SpanContext.TraceID() == rootSpan.SpanContext.TraceID() && span.Parent.SpanID() == rootSpan.SpanContext.SpanID() { + switch { + case strings.HasPrefix(span.Name, "runtime.v1.ImageService"): + imageServiceSpans = append(imageServiceSpans, span) + case strings.HasPrefix(span.Name, "runtime.v1.RuntimeService"): + runtimeServiceSpans = append(runtimeServiceSpans, span) + } + } + } + assert.NotEmpty(t, imageServiceSpans) + assert.NotEmpty(t, runtimeServiceSpans) +} From 69c40a339650189d1c7c0996b8e47099c8a55c6b Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Sat, 2 Dec 2023 19:37:49 +0800 Subject: [PATCH 5/7] fix lints Signed-off-by: Ziqi Zhao --- pkg/kubelet/kubelet_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 108f3b5d81a..2843e0129c7 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -33,7 +33,6 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" - "go.opentelemetry.io/otel/trace" oteltrace "go.opentelemetry.io/otel/trace" "github.com/golang/mock/gomock" @@ -3104,7 +3103,7 @@ func createAndStartFakeRemoteRuntime(t *testing.T) (*fakeremote.RemoteRuntime, s return fakeRuntime, endpoint } -func createRemoteRuntimeService(endpoint string, t *testing.T, tp trace.TracerProvider) internalapi.RuntimeService { +func createRemoteRuntimeService(endpoint string, t *testing.T, tp oteltrace.TracerProvider) internalapi.RuntimeService { runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second, tp) require.NoError(t, err) return runtimeService @@ -3264,6 +3263,7 @@ func TestSyncPodSpans(t *testing.T) { fakeRuntime.ImageService.SetFakeImageSize(100) fakeRuntime.ImageService.SetFakeImages([]string{"test:latest"}) imageSvc, err := remote.NewRemoteImageService(endpoint, 15*time.Second, tp) + assert.NoError(t, err) kubelet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager( kubelet.recorder, @@ -3297,6 +3297,7 @@ func TestSyncPodSpans(t *testing.T) { kubeletutil.NewPodStartupLatencyTracker(), tp, ) + assert.NoError(t, err) pod := podWithUIDNameNsSpec("12345678", "foo", "new", v1.PodSpec{ Containers: []v1.Container{ From 51495bb4c597ac120abf14edcf670ad27e2e536c Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Tue, 5 Dec 2023 08:02:03 +0800 Subject: [PATCH 6/7] optimize the unit test Signed-off-by: Ziqi Zhao --- pkg/kubelet/kubelet_test.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 2843e0129c7..0f0971c3798 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -3249,7 +3249,7 @@ func TestSyncPodSpans(t *testing.T) { exp := tracetest.NewInMemoryExporter() tp := sdktrace.NewTracerProvider( - sdktrace.WithBatcher(exp), + sdktrace.WithSyncer(exp), ) kubelet.tracer = tp.Tracer(instrumentationScope) @@ -3313,7 +3313,6 @@ func TestSyncPodSpans(t *testing.T) { _, err = kubelet.SyncPod(context.Background(), kubetypes.SyncPodCreate, pod, nil, &kubecontainer.PodStatus{}) require.NoError(t, err) - err = tp.ForceFlush(context.Background()) require.NoError(t, err) assert.NotEmpty(t, exp.GetSpans()) @@ -3331,7 +3330,7 @@ func TestSyncPodSpans(t *testing.T) { imageServiceSpans := make([]tracetest.SpanStub, 0) runtimeServiceSpans := make([]tracetest.SpanStub, 0) for _, span := range exp.GetSpans() { - if span.SpanContext.TraceID() == rootSpan.SpanContext.TraceID() && span.Parent.SpanID() == rootSpan.SpanContext.SpanID() { + if span.SpanContext.TraceID() == rootSpan.SpanContext.TraceID() { switch { case strings.HasPrefix(span.Name, "runtime.v1.ImageService"): imageServiceSpans = append(imageServiceSpans, span) @@ -3340,6 +3339,14 @@ func TestSyncPodSpans(t *testing.T) { } } } - assert.NotEmpty(t, imageServiceSpans) - assert.NotEmpty(t, runtimeServiceSpans) + assert.NotEmpty(t, imageServiceSpans, "syncPod trace should have image service spans") + assert.NotEmpty(t, runtimeServiceSpans, "syncPod trace should have runtime service spans") + + for _, span := range imageServiceSpans { + assert.Equal(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), fmt.Sprintf("image service span %s %s should be child of root span", span.Name, span.Parent.SpanID())) + } + + for _, span := range runtimeServiceSpans { + assert.Equal(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), fmt.Sprintf("runtime service span %s %s should be child of root span", span.Name, span.Parent.SpanID())) + } } From e92501b8fc67fbd315ec8175e316c262122e83f8 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Tue, 5 Dec 2023 22:46:27 +0800 Subject: [PATCH 7/7] optimize the comment Signed-off-by: Ziqi Zhao --- pkg/kubelet/kubelet.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 4c82febb311..43e26b29d7d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1956,10 +1956,11 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType } } - // TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker. - // Currently, using that context causes test failures. To remove this todoCtx, any wait.Interrupted - // errors need to be filtered from result and bypass the reasonCache - cancelling the context for - // SyncPod is a known and deliberate error, not a generic error. + // TODO(#113606): use cancellation from the incoming context parameter, which comes from the pod worker. + // Currently, using cancellation from that context causes test failures. To remove this WithoutCancel, + // any wait.Interrupted errors need to be filtered from result and bypass the reasonCache - cancelling + // the context for SyncPod is a known and deliberate error, not a generic error. + // Use WithoutCancel instead of a new context.TODO() to propagate trace context // Call the container runtime's SyncPod callback sctx := context.WithoutCancel(ctx) result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.backOff)