diff --git a/hack/verify-flags/excluded-flags.txt b/hack/verify-flags/excluded-flags.txt index 687e24ff7be..7cf492d6163 100644 --- a/hack/verify-flags/excluded-flags.txt +++ b/hack/verify-flags/excluded-flags.txt @@ -24,3 +24,4 @@ file_content_in_loop break_on_expected_content Premium_LRS VCP_STRESS_ITERATIONS +update_type diff --git a/pkg/kubelet/images/image_gc_manager.go b/pkg/kubelet/images/image_gc_manager.go index d284ed32f13..451dfb08390 100644 --- a/pkg/kubelet/images/image_gc_manager.go +++ b/pkg/kubelet/images/image_gc_manager.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/trace" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -38,6 +39,9 @@ import ( "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" ) +// instrumentationScope is OpenTelemetry instrumentation scope name +const instrumentationScope = "k8s.io/kubernetes/pkg/kubelet/images" + // StatsProvider is an interface for fetching stats used during image garbage // collection. type StatsProvider interface { @@ -104,6 +108,9 @@ type realImageGCManager struct { // sandbox image exempted from GC sandboxImage string + + // tracer for recording spans + tracer trace.Tracer } // imageCache caches latest result of ListImages. @@ -153,7 +160,7 @@ type imageRecord struct { } // NewImageGCManager instantiates a new ImageGCManager object. -func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, recorder record.EventRecorder, nodeRef *v1.ObjectReference, policy ImageGCPolicy, sandboxImage string) (ImageGCManager, error) { +func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, recorder record.EventRecorder, nodeRef *v1.ObjectReference, policy ImageGCPolicy, sandboxImage string, tracerProvider trace.TracerProvider) (ImageGCManager, error) { // Validate policy. if policy.HighThresholdPercent < 0 || policy.HighThresholdPercent > 100 { return nil, fmt.Errorf("invalid HighThresholdPercent %d, must be in range [0-100]", policy.HighThresholdPercent) @@ -164,6 +171,7 @@ func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, r if policy.LowThresholdPercent > policy.HighThresholdPercent { return nil, fmt.Errorf("LowThresholdPercent %d can not be higher than HighThresholdPercent %d", policy.LowThresholdPercent, policy.HighThresholdPercent) } + tracer := tracerProvider.Tracer(instrumentationScope) im := &realImageGCManager{ runtime: runtime, policy: policy, @@ -173,6 +181,7 @@ func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, r nodeRef: nodeRef, initialized: false, sandboxImage: sandboxImage, + tracer: tracer, } return im, nil @@ -279,6 +288,8 @@ func (im *realImageGCManager) detectImages(ctx context.Context, detectTime time. } func (im *realImageGCManager) GarbageCollect(ctx context.Context) error { + ctx, otelSpan := im.tracer.Start(ctx, "Images/GarbageCollect") + defer otelSpan.End() // Get disk usage on disk holding images. fsStats, err := im.statsProvider.ImageFsStats(ctx) if err != nil { diff --git a/pkg/kubelet/images/image_gc_manager_test.go b/pkg/kubelet/images/image_gc_manager_test.go index a2dbf7c8d7d..2bb6061a125 100644 --- a/pkg/kubelet/images/image_gc_manager_test.go +++ b/pkg/kubelet/images/image_gc_manager_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + oteltrace "go.opentelemetry.io/otel/trace" "k8s.io/client-go/tools/record" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/container" @@ -48,6 +49,7 @@ func newRealImageGCManager(policy ImageGCPolicy, mockStatsProvider stats.Provide statsProvider: mockStatsProvider, recorder: &record.FakeRecorder{}, sandboxImage: sandboxImage, + tracer: oteltrace.NewNoopTracerProvider().Tracer(""), }, fakeRuntime } @@ -543,7 +545,7 @@ func TestGarbageCollectBelowLowThreshold(t *testing.T) { manager, _ := newRealImageGCManager(policy, mockStatsProvider) // Expect 40% usage. - mockStatsProvider.EXPECT().ImageFsStats(ctx).Return(&statsapi.FsStats{ + mockStatsProvider.EXPECT().ImageFsStats(gomock.Any()).Return(&statsapi.FsStats{ AvailableBytes: uint64Ptr(600), CapacityBytes: uint64Ptr(1000), }, nil) @@ -562,7 +564,7 @@ func TestGarbageCollectCadvisorFailure(t *testing.T) { mockStatsProvider := statstest.NewMockProvider(mockCtrl) manager, _ := newRealImageGCManager(policy, mockStatsProvider) - mockStatsProvider.EXPECT().ImageFsStats(ctx).Return(&statsapi.FsStats{}, fmt.Errorf("error")) + mockStatsProvider.EXPECT().ImageFsStats(gomock.Any()).Return(&statsapi.FsStats{}, fmt.Errorf("error")) assert.NotNil(t, manager.GarbageCollect(ctx)) } @@ -579,7 +581,7 @@ func TestGarbageCollectBelowSuccess(t *testing.T) { manager, fakeRuntime := newRealImageGCManager(policy, mockStatsProvider) // Expect 95% usage and most of it gets freed. - mockStatsProvider.EXPECT().ImageFsStats(ctx).Return(&statsapi.FsStats{ + mockStatsProvider.EXPECT().ImageFsStats(gomock.Any()).Return(&statsapi.FsStats{ AvailableBytes: uint64Ptr(50), CapacityBytes: uint64Ptr(1000), }, nil) @@ -602,7 +604,7 @@ func TestGarbageCollectNotEnoughFreed(t *testing.T) { manager, fakeRuntime := newRealImageGCManager(policy, mockStatsProvider) // Expect 95% usage and little of it gets freed. - mockStatsProvider.EXPECT().ImageFsStats(ctx).Return(&statsapi.FsStats{ + mockStatsProvider.EXPECT().ImageFsStats(gomock.Any()).Return(&statsapi.FsStats{ AvailableBytes: uint64Ptr(50), CapacityBytes: uint64Ptr(1000), }, nil) @@ -717,7 +719,7 @@ func TestValidateImageGCPolicy(t *testing.T) { } for _, tc := range testCases { - if _, err := NewImageGCManager(nil, nil, nil, nil, tc.imageGCPolicy, ""); err != nil { + if _, err := NewImageGCManager(nil, nil, nil, nil, tc.imageGCPolicy, "", oteltrace.NewNoopTracerProvider()); err != nil { if err.Error() != tc.expectErr { t.Errorf("[%s:]Expected err:%v, but got:%v", tc.name, tc.expectErr, err.Error()) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index bcdcde29bd6..b217d204413 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -37,6 +37,7 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" libcontaineruserns "github.com/opencontainers/runc/libcontainer/userns" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "k8s.io/mount-utils" @@ -198,6 +199,9 @@ const ( // nodeLeaseRenewIntervalFraction is the fraction of lease duration to renew the lease nodeLeaseRenewIntervalFraction = 0.25 + + // instrumentationScope is the name of OpenTelemetry instrumentation scope + instrumentationScope = "k8s.io/kubernetes/pkg/kubelet" ) var ( @@ -511,6 +515,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, insecureContainerLifecycleHTTPClient.CheckRedirect = httpprobe.RedirectChecker(false) } + tracer := kubeDeps.TracerProvider.Tracer(instrumentationScope) + klet := &Kubelet{ hostname: hostname, hostnameOverridden: hostnameOverridden, @@ -561,6 +567,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate), keepTerminatedPodVolumes: keepTerminatedPodVolumes, nodeStatusMaxImages: nodeStatusMaxImages, + tracer: tracer, } if klet.cloud != nil { @@ -677,6 +684,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps.ContainerManager.GetNodeAllocatableAbsolute, *kubeCfg.MemoryThrottlingFactor, kubeDeps.PodStartupLatencyTracker, + kubeDeps.TracerProvider, ) if err != nil { return nil, err @@ -760,7 +768,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod)) // setup imageManager - imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage) + imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage, kubeDeps.TracerProvider) if err != nil { return nil, fmt.Errorf("failed to initialize image manager: %v", err) } @@ -1252,6 +1260,9 @@ type Kubelet struct { // Mutex to serialize new pod admission and existing pod resizing podResizeMutex sync.Mutex + + // OpenTelemetry Tracer + tracer trace.Tracer } // ListPodStats is delegated to StatsProvider, which implements stats.Provider interface @@ -1612,10 +1623,17 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) { // TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker. // Currently, using that context causes test failures. - ctx := context.TODO() + ctx, otelSpan := kl.tracer.Start(context.TODO(), "syncPod", trace.WithAttributes( + attribute.String("k8s.pod.uid", string(pod.UID)), + attribute.String("k8s.pod", klog.KObj(pod).String()), + attribute.String("k8s.pod.name", pod.Name), + attribute.String("k8s.pod.update_type", updateType.String()), + attribute.String("k8s.namespace.name", pod.Namespace), + )) klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) defer func() { klog.V(4).InfoS("SyncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal) + otelSpan.End() }() // Latency measurements for the main workflow are relative to the @@ -1882,7 +1900,13 @@ func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error { // TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker. // Currently, using that context causes test failures. - ctx := context.Background() + ctx, otelSpan := kl.tracer.Start(context.Background(), "syncTerminatingPod", trace.WithAttributes( + attribute.String("k8s.pod.uid", string(pod.UID)), + attribute.String("k8s.pod", klog.KObj(pod).String()), + attribute.String("k8s.pod.name", pod.Name), + attribute.String("k8s.namespace.name", pod.Namespace), + )) + defer otelSpan.End() klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) defer klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) @@ -2004,6 +2028,13 @@ func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kube // This typically occurs when a pod is force deleted from configuration (local disk or API) and the // kubelet restarts in the middle of the action. func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error { + _, otelSpan := kl.tracer.Start(context.Background(), "syncTerminatedPod", trace.WithAttributes( + attribute.String("k8s.pod.uid", string(pod.UID)), + attribute.String("k8s.pod", klog.KObj(pod).String()), + attribute.String("k8s.pod.name", pod.Name), + attribute.String("k8s.namespace.name", pod.Namespace), + )) + defer otelSpan.End() klog.V(4).InfoS("SyncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) defer klog.V(4).InfoS("SyncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 20bf2f9fee8..8f28d535137 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -253,6 +253,7 @@ func newTestKubeletWithImageList( kubelet.cadvisor = &cadvisortest.Fake{} machineInfo, _ := kubelet.cadvisor.MachineInfo() kubelet.setCachedMachineInfo(machineInfo) + kubelet.tracer = oteltrace.NewNoopTracerProvider().Tracer("") fakeMirrorClient := podtest.NewFakeMirrorClient() secretManager := secret.NewSimpleSecretManager(kubelet.kubeClient) @@ -305,7 +306,7 @@ func newTestKubeletWithImageList( HighThresholdPercent: 90, LowThresholdPercent: 80, } - imageGCManager, err := images.NewImageGCManager(fakeRuntime, kubelet.StatsProvider, fakeRecorder, fakeNodeRef, fakeImageGCPolicy, "") + imageGCManager, err := images.NewImageGCManager(fakeRuntime, kubelet.StatsProvider, fakeRecorder, fakeNodeRef, fakeImageGCPolicy, "", oteltrace.NewNoopTracerProvider()) assert.NoError(t, err) kubelet.imageManager = &fakeImageGCManager{ fakeImageService: fakeRuntime, diff --git a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go index 37479dd5b58..2f6ef8e6977 100644 --- a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go @@ -22,6 +22,7 @@ import ( "time" cadvisorapi "github.com/google/cadvisor/info/v1" + "go.opentelemetry.io/otel/trace" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -90,7 +91,7 @@ func (f *fakePodPullingTimeRecorder) RecordImageStartedPulling(podUID types.UID) func (f *fakePodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID) {} -func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring) (*kubeGenericRuntimeManager, error) { +func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring, tracer trace.Tracer) (*kubeGenericRuntimeManager, error) { ctx := context.Background() recorder := &record.FakeRecorder{} logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2) @@ -122,7 +123,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS } podStateProvider := newFakePodStateProvider() - kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager) + kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager, tracer) kubeRuntimeManager.podStateProvider = podStateProvider kubeRuntimeManager.runtimeName = typedVersion.RuntimeName kubeRuntimeManager.imagePuller = images.NewImageManager( diff --git a/pkg/kubelet/kuberuntime/kuberuntime_gc.go b/pkg/kubelet/kuberuntime/kuberuntime_gc.go index eba6159c79e..a91e190ee75 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_gc.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_gc.go @@ -24,6 +24,7 @@ import ( "sort" "time" + "go.opentelemetry.io/otel/trace" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" @@ -38,14 +39,16 @@ type containerGC struct { client internalapi.RuntimeService manager *kubeGenericRuntimeManager podStateProvider podStateProvider + tracer trace.Tracer } // NewContainerGC creates a new containerGC. -func newContainerGC(client internalapi.RuntimeService, podStateProvider podStateProvider, manager *kubeGenericRuntimeManager) *containerGC { +func newContainerGC(client internalapi.RuntimeService, podStateProvider podStateProvider, manager *kubeGenericRuntimeManager, tracer trace.Tracer) *containerGC { return &containerGC{ client: client, manager: manager, podStateProvider: podStateProvider, + tracer: tracer, } } @@ -407,6 +410,8 @@ func (cgc *containerGC) evictPodLogsDirectories(ctx context.Context, allSourcesR // * gets evictable sandboxes which are not ready and contains no containers. // * removes evictable sandboxes. func (cgc *containerGC) GarbageCollect(ctx context.Context, gcPolicy kubecontainer.GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error { + ctx, otelSpan := cgc.tracer.Start(ctx, "Containers/GarbageCollect") + defer otelSpan.End() errors := []error{} // Remove evictable containers if err := cgc.evictContainers(ctx, gcPolicy, allSourcesReady, evictNonDeletedPods); err != nil { diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index ca3dc026784..f08fdc59c0c 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -26,6 +26,7 @@ import ( "time" cadvisorapi "github.com/google/cadvisor/info/v1" + "go.opentelemetry.io/otel/trace" crierror "k8s.io/cri-api/pkg/errors" "k8s.io/klog/v2" @@ -77,6 +78,8 @@ const ( versionCacheTTL = 60 * time.Second // How frequently to report identical errors identicalErrorDelay = 1 * time.Minute + // OpenTelemetry instrumentation scope name + instrumentationScope = "k8s.io/kubernetes/pkg/kubelet/kuberuntime" ) var ( @@ -205,10 +208,12 @@ func NewKubeGenericRuntimeManager( getNodeAllocatable func() v1.ResourceList, memoryThrottlingFactor float64, podPullingTimeRecorder images.ImagePodPullingTimeRecorder, + tracerProvider trace.TracerProvider, ) (KubeGenericRuntime, error) { ctx := context.Background() runtimeService = newInstrumentedRuntimeService(runtimeService) imageService = newInstrumentedImageManagerService(imageService) + tracer := tracerProvider.Tracer(instrumentationScope) kubeRuntimeManager := &kubeGenericRuntimeManager{ recorder: recorder, cpuCFSQuota: cpuCFSQuota, @@ -281,7 +286,7 @@ func NewKubeGenericRuntimeManager( imagePullBurst, podPullingTimeRecorder) kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(insecureContainerLifecycleHTTPClient, kubeRuntimeManager, kubeRuntimeManager, recorder) - kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager) + kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager, tracer) kubeRuntimeManager.podStateProvider = podStateProvider kubeRuntimeManager.versionCache = cache.NewObjectCache( diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go index 0747ea91e0a..d49857835ac 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go @@ -31,6 +31,7 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + oteltrace "go.opentelemetry.io/otel/trace" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -69,7 +70,7 @@ func customTestRuntimeManager(keyring *credentialprovider.BasicDockerKeyring) (* MemoryCapacity: uint64(memoryCapacityQuantity.Value()), } osInterface := &containertest.FakeOS{} - manager, err := newFakeKubeRuntimeManager(fakeRuntimeService, fakeImageService, machineInfo, osInterface, &containertest.FakeRuntimeHelper{}, keyring) + manager, err := newFakeKubeRuntimeManager(fakeRuntimeService, fakeImageService, machineInfo, osInterface, &containertest.FakeRuntimeHelper{}, keyring, oteltrace.NewNoopTracerProvider().Tracer("")) return fakeRuntimeService, fakeImageService, manager, err }