Merge pull request #122087 from fatsheep9146/fix-kubelet-trace-broke

fix kubelet trace broke in 1.28
This commit is contained in:
Kubernetes Prow Robot 2024-01-04 17:59:39 +01:00 committed by GitHub
commit 2b1ccec47e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 138 additions and 9 deletions

View File

@ -1945,13 +1945,14 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
} }
} }
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker. // TODO(#113606): use cancellation from the incoming context parameter, which comes from the pod worker.
// Currently, using that context causes test failures. To remove this todoCtx, any wait.Interrupted // Currently, using cancellation from that context causes test failures. To remove this WithoutCancel,
// errors need to be filtered from result and bypass the reasonCache - cancelling the context for // any wait.Interrupted errors need to be filtered from result and bypass the reasonCache - cancelling
// SyncPod is a known and deliberate error, not a generic error. // the context for SyncPod is a known and deliberate error, not a generic error.
todoCtx := context.TODO() // Use WithoutCancel instead of a new context.TODO() to propagate trace context
// Call the container runtime's SyncPod callback // Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(todoCtx, pod, podStatus, pullSecrets, kl.backOff) sctx := context.WithoutCancel(ctx)
result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result) kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil { if err := result.Error(); err != nil {
// Do not return error if the only failures were pods in backoff // Do not return error if the only failures were pods in backoff

View File

@ -31,6 +31,8 @@ import (
"testing" "testing"
"time" "time"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
oteltrace "go.opentelemetry.io/otel/trace" oteltrace "go.opentelemetry.io/otel/trace"
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
@ -69,6 +71,7 @@ import (
fakeremote "k8s.io/kubernetes/pkg/kubelet/cri/remote/fake" fakeremote "k8s.io/kubernetes/pkg/kubelet/cri/remote/fake"
"k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs" "k8s.io/kubernetes/pkg/kubelet/logs"
"k8s.io/kubernetes/pkg/kubelet/network/dns" "k8s.io/kubernetes/pkg/kubelet/network/dns"
@ -3035,8 +3038,8 @@ func createAndStartFakeRemoteRuntime(t *testing.T) (*fakeremote.RemoteRuntime, s
return fakeRuntime, endpoint return fakeRuntime, endpoint
} }
func createRemoteRuntimeService(endpoint string, t *testing.T) internalapi.RuntimeService { func createRemoteRuntimeService(endpoint string, t *testing.T, tp oteltrace.TracerProvider) internalapi.RuntimeService {
runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second, oteltrace.NewNoopTracerProvider()) runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second, tp)
require.NoError(t, err) require.NoError(t, err)
return runtimeService return runtimeService
} }
@ -3074,7 +3077,7 @@ func TestNewMainKubeletStandAlone(t *testing.T) {
fakeRuntime.Stop() fakeRuntime.Stop()
}() }()
fakeRecorder := &record.FakeRecorder{} fakeRecorder := &record.FakeRecorder{}
rtSvc := createRemoteRuntimeService(endpoint, t) rtSvc := createRemoteRuntimeService(endpoint, t, oteltrace.NewNoopTracerProvider())
kubeDep := &Dependencies{ kubeDep := &Dependencies{
Auth: nil, Auth: nil,
CAdvisorInterface: cadvisor, CAdvisorInterface: cadvisor,
@ -3157,3 +3160,128 @@ func TestNewMainKubeletStandAlone(t *testing.T) {
assert.Nil(t, testMainKubelet.configMapManager, "configmap manager should be nil if kubelet is in standalone mode") assert.Nil(t, testMainKubelet.configMapManager, "configmap manager should be nil if kubelet is in standalone mode")
assert.Nil(t, testMainKubelet.secretManager, "secret manager should be nil if kubelet is in standalone mode") assert.Nil(t, testMainKubelet.secretManager, "secret manager should be nil if kubelet is in standalone mode")
} }
func TestSyncPodSpans(t *testing.T) {
testKubelet := newTestKubelet(t, false)
kubelet := testKubelet.kubelet
recorder := record.NewFakeRecorder(20)
nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: "testNode",
UID: types.UID("testNode"),
Namespace: "",
}
kubelet.dnsConfigurer = dns.NewConfigurer(recorder, nodeRef, nil, nil, "TEST", "")
kubeCfg := &kubeletconfiginternal.KubeletConfiguration{
SyncFrequency: metav1.Duration{Duration: time.Minute},
ConfigMapAndSecretChangeDetectionStrategy: kubeletconfiginternal.WatchChangeDetectionStrategy,
ContainerLogMaxSize: "10Mi",
ContainerLogMaxFiles: 5,
MemoryThrottlingFactor: utilpointer.Float64(0),
}
exp := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSyncer(exp),
)
kubelet.tracer = tp.Tracer(instrumentationScope)
fakeRuntime, endpoint := createAndStartFakeRemoteRuntime(t)
defer func() {
fakeRuntime.Stop()
}()
runtimeSvc := createRemoteRuntimeService(endpoint, t, tp)
kubelet.runtimeService = runtimeSvc
fakeRuntime.ImageService.SetFakeImageSize(100)
fakeRuntime.ImageService.SetFakeImages([]string{"test:latest"})
imageSvc, err := remote.NewRemoteImageService(endpoint, 15*time.Second, tp)
assert.NoError(t, err)
kubelet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager(
kubelet.recorder,
kubelet.livenessManager,
kubelet.readinessManager,
kubelet.startupManager,
kubelet.rootDirectory,
kubelet.machineInfo,
kubelet.podWorkers,
kubelet.os,
kubelet,
nil,
kubelet.backOff,
kubeCfg.SerializeImagePulls,
kubeCfg.MaxParallelImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
"",
"",
kubeCfg.CPUCFSQuota,
kubeCfg.CPUCFSQuotaPeriod,
runtimeSvc,
imageSvc,
kubelet.containerManager,
kubelet.containerLogManager,
kubelet.runtimeClassManager,
false,
kubeCfg.MemorySwap.SwapBehavior,
kubelet.containerManager.GetNodeAllocatableAbsolute,
*kubeCfg.MemoryThrottlingFactor,
kubeletutil.NewPodStartupLatencyTracker(),
tp,
)
assert.NoError(t, err)
pod := podWithUIDNameNsSpec("12345678", "foo", "new", v1.PodSpec{
Containers: []v1.Container{
{
Name: "bar",
Image: "test:latest",
ImagePullPolicy: v1.PullAlways,
},
},
EnableServiceLinks: utilpointer.Bool(false),
})
_, err = kubelet.SyncPod(context.Background(), kubetypes.SyncPodCreate, pod, nil, &kubecontainer.PodStatus{})
require.NoError(t, err)
require.NoError(t, err)
assert.NotEmpty(t, exp.GetSpans())
// find root span for syncPod
var rootSpan *tracetest.SpanStub
spans := exp.GetSpans()
for i, span := range spans {
if span.Name == "syncPod" {
rootSpan = &spans[i]
break
}
}
assert.NotNil(t, rootSpan)
imageServiceSpans := make([]tracetest.SpanStub, 0)
runtimeServiceSpans := make([]tracetest.SpanStub, 0)
for _, span := range exp.GetSpans() {
if span.SpanContext.TraceID() == rootSpan.SpanContext.TraceID() {
switch {
case strings.HasPrefix(span.Name, "runtime.v1.ImageService"):
imageServiceSpans = append(imageServiceSpans, span)
case strings.HasPrefix(span.Name, "runtime.v1.RuntimeService"):
runtimeServiceSpans = append(runtimeServiceSpans, span)
}
}
}
assert.NotEmpty(t, imageServiceSpans, "syncPod trace should have image service spans")
assert.NotEmpty(t, runtimeServiceSpans, "syncPod trace should have runtime service spans")
for _, span := range imageServiceSpans {
assert.Equal(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), fmt.Sprintf("image service span %s %s should be child of root span", span.Name, span.Parent.SpanID()))
}
for _, span := range runtimeServiceSpans {
assert.Equal(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), fmt.Sprintf("runtime service span %s %s should be child of root span", span.Name, span.Parent.SpanID()))
}
}