mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
kubelet: create top-level traces for pod sync and GC
This starts new top level OpenTelemetry spans every time syncPod or image / container GC is invoked
This commit is contained in:
parent
c6f3007071
commit
556d774945
@ -24,3 +24,4 @@ file_content_in_loop
|
|||||||
break_on_expected_content
|
break_on_expected_content
|
||||||
Premium_LRS
|
Premium_LRS
|
||||||
VCP_STRESS_ITERATIONS
|
VCP_STRESS_ITERATIONS
|
||||||
|
update_type
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
@ -38,6 +39,9 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
|
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// instrumentationScope is OpenTelemetry instrumentation scope name
|
||||||
|
const instrumentationScope = "k8s.io/kubernetes/pkg/kubelet/images"
|
||||||
|
|
||||||
// StatsProvider is an interface for fetching stats used during image garbage
|
// StatsProvider is an interface for fetching stats used during image garbage
|
||||||
// collection.
|
// collection.
|
||||||
type StatsProvider interface {
|
type StatsProvider interface {
|
||||||
@ -104,6 +108,9 @@ type realImageGCManager struct {
|
|||||||
|
|
||||||
// sandbox image exempted from GC
|
// sandbox image exempted from GC
|
||||||
sandboxImage string
|
sandboxImage string
|
||||||
|
|
||||||
|
// tracer for recording spans
|
||||||
|
tracer trace.Tracer
|
||||||
}
|
}
|
||||||
|
|
||||||
// imageCache caches latest result of ListImages.
|
// imageCache caches latest result of ListImages.
|
||||||
@ -153,7 +160,7 @@ type imageRecord struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewImageGCManager instantiates a new ImageGCManager object.
|
// NewImageGCManager instantiates a new ImageGCManager object.
|
||||||
func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, recorder record.EventRecorder, nodeRef *v1.ObjectReference, policy ImageGCPolicy, sandboxImage string) (ImageGCManager, error) {
|
func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, recorder record.EventRecorder, nodeRef *v1.ObjectReference, policy ImageGCPolicy, sandboxImage string, tracerProvider trace.TracerProvider) (ImageGCManager, error) {
|
||||||
// Validate policy.
|
// Validate policy.
|
||||||
if policy.HighThresholdPercent < 0 || policy.HighThresholdPercent > 100 {
|
if policy.HighThresholdPercent < 0 || policy.HighThresholdPercent > 100 {
|
||||||
return nil, fmt.Errorf("invalid HighThresholdPercent %d, must be in range [0-100]", policy.HighThresholdPercent)
|
return nil, fmt.Errorf("invalid HighThresholdPercent %d, must be in range [0-100]", policy.HighThresholdPercent)
|
||||||
@ -164,6 +171,7 @@ func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, r
|
|||||||
if policy.LowThresholdPercent > policy.HighThresholdPercent {
|
if policy.LowThresholdPercent > policy.HighThresholdPercent {
|
||||||
return nil, fmt.Errorf("LowThresholdPercent %d can not be higher than HighThresholdPercent %d", policy.LowThresholdPercent, policy.HighThresholdPercent)
|
return nil, fmt.Errorf("LowThresholdPercent %d can not be higher than HighThresholdPercent %d", policy.LowThresholdPercent, policy.HighThresholdPercent)
|
||||||
}
|
}
|
||||||
|
tracer := tracerProvider.Tracer(instrumentationScope)
|
||||||
im := &realImageGCManager{
|
im := &realImageGCManager{
|
||||||
runtime: runtime,
|
runtime: runtime,
|
||||||
policy: policy,
|
policy: policy,
|
||||||
@ -173,6 +181,7 @@ func NewImageGCManager(runtime container.Runtime, statsProvider StatsProvider, r
|
|||||||
nodeRef: nodeRef,
|
nodeRef: nodeRef,
|
||||||
initialized: false,
|
initialized: false,
|
||||||
sandboxImage: sandboxImage,
|
sandboxImage: sandboxImage,
|
||||||
|
tracer: tracer,
|
||||||
}
|
}
|
||||||
|
|
||||||
return im, nil
|
return im, nil
|
||||||
@ -279,6 +288,8 @@ func (im *realImageGCManager) detectImages(ctx context.Context, detectTime time.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (im *realImageGCManager) GarbageCollect(ctx context.Context) error {
|
func (im *realImageGCManager) GarbageCollect(ctx context.Context) error {
|
||||||
|
ctx, otelSpan := im.tracer.Start(ctx, "Images/GarbageCollect")
|
||||||
|
defer otelSpan.End()
|
||||||
// Get disk usage on disk holding images.
|
// Get disk usage on disk holding images.
|
||||||
fsStats, err := im.statsProvider.ImageFsStats(ctx)
|
fsStats, err := im.statsProvider.ImageFsStats(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
oteltrace "go.opentelemetry.io/otel/trace"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
|
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/container"
|
"k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
@ -48,6 +49,7 @@ func newRealImageGCManager(policy ImageGCPolicy, mockStatsProvider stats.Provide
|
|||||||
statsProvider: mockStatsProvider,
|
statsProvider: mockStatsProvider,
|
||||||
recorder: &record.FakeRecorder{},
|
recorder: &record.FakeRecorder{},
|
||||||
sandboxImage: sandboxImage,
|
sandboxImage: sandboxImage,
|
||||||
|
tracer: oteltrace.NewNoopTracerProvider().Tracer(""),
|
||||||
}, fakeRuntime
|
}, fakeRuntime
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -543,7 +545,7 @@ func TestGarbageCollectBelowLowThreshold(t *testing.T) {
|
|||||||
manager, _ := newRealImageGCManager(policy, mockStatsProvider)
|
manager, _ := newRealImageGCManager(policy, mockStatsProvider)
|
||||||
|
|
||||||
// Expect 40% usage.
|
// Expect 40% usage.
|
||||||
mockStatsProvider.EXPECT().ImageFsStats(ctx).Return(&statsapi.FsStats{
|
mockStatsProvider.EXPECT().ImageFsStats(gomock.Any()).Return(&statsapi.FsStats{
|
||||||
AvailableBytes: uint64Ptr(600),
|
AvailableBytes: uint64Ptr(600),
|
||||||
CapacityBytes: uint64Ptr(1000),
|
CapacityBytes: uint64Ptr(1000),
|
||||||
}, nil)
|
}, nil)
|
||||||
@ -562,7 +564,7 @@ func TestGarbageCollectCadvisorFailure(t *testing.T) {
|
|||||||
mockStatsProvider := statstest.NewMockProvider(mockCtrl)
|
mockStatsProvider := statstest.NewMockProvider(mockCtrl)
|
||||||
manager, _ := newRealImageGCManager(policy, mockStatsProvider)
|
manager, _ := newRealImageGCManager(policy, mockStatsProvider)
|
||||||
|
|
||||||
mockStatsProvider.EXPECT().ImageFsStats(ctx).Return(&statsapi.FsStats{}, fmt.Errorf("error"))
|
mockStatsProvider.EXPECT().ImageFsStats(gomock.Any()).Return(&statsapi.FsStats{}, fmt.Errorf("error"))
|
||||||
assert.NotNil(t, manager.GarbageCollect(ctx))
|
assert.NotNil(t, manager.GarbageCollect(ctx))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -579,7 +581,7 @@ func TestGarbageCollectBelowSuccess(t *testing.T) {
|
|||||||
manager, fakeRuntime := newRealImageGCManager(policy, mockStatsProvider)
|
manager, fakeRuntime := newRealImageGCManager(policy, mockStatsProvider)
|
||||||
|
|
||||||
// Expect 95% usage and most of it gets freed.
|
// Expect 95% usage and most of it gets freed.
|
||||||
mockStatsProvider.EXPECT().ImageFsStats(ctx).Return(&statsapi.FsStats{
|
mockStatsProvider.EXPECT().ImageFsStats(gomock.Any()).Return(&statsapi.FsStats{
|
||||||
AvailableBytes: uint64Ptr(50),
|
AvailableBytes: uint64Ptr(50),
|
||||||
CapacityBytes: uint64Ptr(1000),
|
CapacityBytes: uint64Ptr(1000),
|
||||||
}, nil)
|
}, nil)
|
||||||
@ -602,7 +604,7 @@ func TestGarbageCollectNotEnoughFreed(t *testing.T) {
|
|||||||
manager, fakeRuntime := newRealImageGCManager(policy, mockStatsProvider)
|
manager, fakeRuntime := newRealImageGCManager(policy, mockStatsProvider)
|
||||||
|
|
||||||
// Expect 95% usage and little of it gets freed.
|
// Expect 95% usage and little of it gets freed.
|
||||||
mockStatsProvider.EXPECT().ImageFsStats(ctx).Return(&statsapi.FsStats{
|
mockStatsProvider.EXPECT().ImageFsStats(gomock.Any()).Return(&statsapi.FsStats{
|
||||||
AvailableBytes: uint64Ptr(50),
|
AvailableBytes: uint64Ptr(50),
|
||||||
CapacityBytes: uint64Ptr(1000),
|
CapacityBytes: uint64Ptr(1000),
|
||||||
}, nil)
|
}, nil)
|
||||||
@ -717,7 +719,7 @@ func TestValidateImageGCPolicy(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
if _, err := NewImageGCManager(nil, nil, nil, nil, tc.imageGCPolicy, ""); err != nil {
|
if _, err := NewImageGCManager(nil, nil, nil, nil, tc.imageGCPolicy, "", oteltrace.NewNoopTracerProvider()); err != nil {
|
||||||
if err.Error() != tc.expectErr {
|
if err.Error() != tc.expectErr {
|
||||||
t.Errorf("[%s:]Expected err:%v, but got:%v", tc.name, tc.expectErr, err.Error())
|
t.Errorf("[%s:]Expected err:%v, but got:%v", tc.name, tc.expectErr, err.Error())
|
||||||
}
|
}
|
||||||
|
@ -37,6 +37,7 @@ import (
|
|||||||
|
|
||||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||||
libcontaineruserns "github.com/opencontainers/runc/libcontainer/userns"
|
libcontaineruserns "github.com/opencontainers/runc/libcontainer/userns"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
"k8s.io/mount-utils"
|
"k8s.io/mount-utils"
|
||||||
@ -198,6 +199,9 @@ const (
|
|||||||
|
|
||||||
// nodeLeaseRenewIntervalFraction is the fraction of lease duration to renew the lease
|
// nodeLeaseRenewIntervalFraction is the fraction of lease duration to renew the lease
|
||||||
nodeLeaseRenewIntervalFraction = 0.25
|
nodeLeaseRenewIntervalFraction = 0.25
|
||||||
|
|
||||||
|
// instrumentationScope is the name of OpenTelemetry instrumentation scope
|
||||||
|
instrumentationScope = "k8s.io/kubernetes/pkg/kubelet"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -511,6 +515,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
insecureContainerLifecycleHTTPClient.CheckRedirect = httpprobe.RedirectChecker(false)
|
insecureContainerLifecycleHTTPClient.CheckRedirect = httpprobe.RedirectChecker(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tracer := kubeDeps.TracerProvider.Tracer(instrumentationScope)
|
||||||
|
|
||||||
klet := &Kubelet{
|
klet := &Kubelet{
|
||||||
hostname: hostname,
|
hostname: hostname,
|
||||||
hostnameOverridden: hostnameOverridden,
|
hostnameOverridden: hostnameOverridden,
|
||||||
@ -561,6 +567,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
|
experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
|
||||||
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
|
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
|
||||||
nodeStatusMaxImages: nodeStatusMaxImages,
|
nodeStatusMaxImages: nodeStatusMaxImages,
|
||||||
|
tracer: tracer,
|
||||||
}
|
}
|
||||||
|
|
||||||
if klet.cloud != nil {
|
if klet.cloud != nil {
|
||||||
@ -677,6 +684,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
|
kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
|
||||||
*kubeCfg.MemoryThrottlingFactor,
|
*kubeCfg.MemoryThrottlingFactor,
|
||||||
kubeDeps.PodStartupLatencyTracker,
|
kubeDeps.PodStartupLatencyTracker,
|
||||||
|
kubeDeps.TracerProvider,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -760,7 +768,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
|
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
|
||||||
|
|
||||||
// setup imageManager
|
// setup imageManager
|
||||||
imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
|
imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage, kubeDeps.TracerProvider)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
|
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
|
||||||
}
|
}
|
||||||
@ -1252,6 +1260,9 @@ type Kubelet struct {
|
|||||||
|
|
||||||
// Mutex to serialize new pod admission and existing pod resizing
|
// Mutex to serialize new pod admission and existing pod resizing
|
||||||
podResizeMutex sync.Mutex
|
podResizeMutex sync.Mutex
|
||||||
|
|
||||||
|
// OpenTelemetry Tracer
|
||||||
|
tracer trace.Tracer
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListPodStats is delegated to StatsProvider, which implements stats.Provider interface
|
// ListPodStats is delegated to StatsProvider, which implements stats.Provider interface
|
||||||
@ -1612,10 +1623,17 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
|||||||
func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
|
func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
|
||||||
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
|
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
|
||||||
// Currently, using that context causes test failures.
|
// Currently, using that context causes test failures.
|
||||||
ctx := context.TODO()
|
ctx, otelSpan := kl.tracer.Start(context.TODO(), "syncPod", trace.WithAttributes(
|
||||||
|
attribute.String("k8s.pod.uid", string(pod.UID)),
|
||||||
|
attribute.String("k8s.pod", klog.KObj(pod).String()),
|
||||||
|
attribute.String("k8s.pod.name", pod.Name),
|
||||||
|
attribute.String("k8s.pod.update_type", updateType.String()),
|
||||||
|
attribute.String("k8s.namespace.name", pod.Namespace),
|
||||||
|
))
|
||||||
klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||||
defer func() {
|
defer func() {
|
||||||
klog.V(4).InfoS("SyncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
|
klog.V(4).InfoS("SyncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
|
||||||
|
otelSpan.End()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Latency measurements for the main workflow are relative to the
|
// Latency measurements for the main workflow are relative to the
|
||||||
@ -1882,7 +1900,13 @@ func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType,
|
|||||||
func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
|
func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
|
||||||
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
|
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
|
||||||
// Currently, using that context causes test failures.
|
// Currently, using that context causes test failures.
|
||||||
ctx := context.Background()
|
ctx, otelSpan := kl.tracer.Start(context.Background(), "syncTerminatingPod", trace.WithAttributes(
|
||||||
|
attribute.String("k8s.pod.uid", string(pod.UID)),
|
||||||
|
attribute.String("k8s.pod", klog.KObj(pod).String()),
|
||||||
|
attribute.String("k8s.pod.name", pod.Name),
|
||||||
|
attribute.String("k8s.namespace.name", pod.Namespace),
|
||||||
|
))
|
||||||
|
defer otelSpan.End()
|
||||||
klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||||
defer klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
defer klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||||
|
|
||||||
@ -2004,6 +2028,13 @@ func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kube
|
|||||||
// This typically occurs when a pod is force deleted from configuration (local disk or API) and the
|
// This typically occurs when a pod is force deleted from configuration (local disk or API) and the
|
||||||
// kubelet restarts in the middle of the action.
|
// kubelet restarts in the middle of the action.
|
||||||
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
|
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
|
||||||
|
_, otelSpan := kl.tracer.Start(context.Background(), "syncTerminatedPod", trace.WithAttributes(
|
||||||
|
attribute.String("k8s.pod.uid", string(pod.UID)),
|
||||||
|
attribute.String("k8s.pod", klog.KObj(pod).String()),
|
||||||
|
attribute.String("k8s.pod.name", pod.Name),
|
||||||
|
attribute.String("k8s.namespace.name", pod.Namespace),
|
||||||
|
))
|
||||||
|
defer otelSpan.End()
|
||||||
klog.V(4).InfoS("SyncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
klog.V(4).InfoS("SyncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||||
defer klog.V(4).InfoS("SyncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
defer klog.V(4).InfoS("SyncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||||
|
|
||||||
|
@ -253,6 +253,7 @@ func newTestKubeletWithImageList(
|
|||||||
kubelet.cadvisor = &cadvisortest.Fake{}
|
kubelet.cadvisor = &cadvisortest.Fake{}
|
||||||
machineInfo, _ := kubelet.cadvisor.MachineInfo()
|
machineInfo, _ := kubelet.cadvisor.MachineInfo()
|
||||||
kubelet.setCachedMachineInfo(machineInfo)
|
kubelet.setCachedMachineInfo(machineInfo)
|
||||||
|
kubelet.tracer = oteltrace.NewNoopTracerProvider().Tracer("")
|
||||||
|
|
||||||
fakeMirrorClient := podtest.NewFakeMirrorClient()
|
fakeMirrorClient := podtest.NewFakeMirrorClient()
|
||||||
secretManager := secret.NewSimpleSecretManager(kubelet.kubeClient)
|
secretManager := secret.NewSimpleSecretManager(kubelet.kubeClient)
|
||||||
@ -305,7 +306,7 @@ func newTestKubeletWithImageList(
|
|||||||
HighThresholdPercent: 90,
|
HighThresholdPercent: 90,
|
||||||
LowThresholdPercent: 80,
|
LowThresholdPercent: 80,
|
||||||
}
|
}
|
||||||
imageGCManager, err := images.NewImageGCManager(fakeRuntime, kubelet.StatsProvider, fakeRecorder, fakeNodeRef, fakeImageGCPolicy, "")
|
imageGCManager, err := images.NewImageGCManager(fakeRuntime, kubelet.StatsProvider, fakeRecorder, fakeNodeRef, fakeImageGCPolicy, "", oteltrace.NewNoopTracerProvider())
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
kubelet.imageManager = &fakeImageGCManager{
|
kubelet.imageManager = &fakeImageGCManager{
|
||||||
fakeImageService: fakeRuntime,
|
fakeImageService: fakeRuntime,
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -90,7 +91,7 @@ func (f *fakePodPullingTimeRecorder) RecordImageStartedPulling(podUID types.UID)
|
|||||||
|
|
||||||
func (f *fakePodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID) {}
|
func (f *fakePodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID) {}
|
||||||
|
|
||||||
func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring) (*kubeGenericRuntimeManager, error) {
|
func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring, tracer trace.Tracer) (*kubeGenericRuntimeManager, error) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
recorder := &record.FakeRecorder{}
|
recorder := &record.FakeRecorder{}
|
||||||
logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2)
|
logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2)
|
||||||
@ -122,7 +123,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
|
|||||||
}
|
}
|
||||||
|
|
||||||
podStateProvider := newFakePodStateProvider()
|
podStateProvider := newFakePodStateProvider()
|
||||||
kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)
|
kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager, tracer)
|
||||||
kubeRuntimeManager.podStateProvider = podStateProvider
|
kubeRuntimeManager.podStateProvider = podStateProvider
|
||||||
kubeRuntimeManager.runtimeName = typedVersion.RuntimeName
|
kubeRuntimeManager.runtimeName = typedVersion.RuntimeName
|
||||||
kubeRuntimeManager.imagePuller = images.NewImageManager(
|
kubeRuntimeManager.imagePuller = images.NewImageManager(
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
@ -38,14 +39,16 @@ type containerGC struct {
|
|||||||
client internalapi.RuntimeService
|
client internalapi.RuntimeService
|
||||||
manager *kubeGenericRuntimeManager
|
manager *kubeGenericRuntimeManager
|
||||||
podStateProvider podStateProvider
|
podStateProvider podStateProvider
|
||||||
|
tracer trace.Tracer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewContainerGC creates a new containerGC.
|
// NewContainerGC creates a new containerGC.
|
||||||
func newContainerGC(client internalapi.RuntimeService, podStateProvider podStateProvider, manager *kubeGenericRuntimeManager) *containerGC {
|
func newContainerGC(client internalapi.RuntimeService, podStateProvider podStateProvider, manager *kubeGenericRuntimeManager, tracer trace.Tracer) *containerGC {
|
||||||
return &containerGC{
|
return &containerGC{
|
||||||
client: client,
|
client: client,
|
||||||
manager: manager,
|
manager: manager,
|
||||||
podStateProvider: podStateProvider,
|
podStateProvider: podStateProvider,
|
||||||
|
tracer: tracer,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -407,6 +410,8 @@ func (cgc *containerGC) evictPodLogsDirectories(ctx context.Context, allSourcesR
|
|||||||
// * gets evictable sandboxes which are not ready and contains no containers.
|
// * gets evictable sandboxes which are not ready and contains no containers.
|
||||||
// * removes evictable sandboxes.
|
// * removes evictable sandboxes.
|
||||||
func (cgc *containerGC) GarbageCollect(ctx context.Context, gcPolicy kubecontainer.GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error {
|
func (cgc *containerGC) GarbageCollect(ctx context.Context, gcPolicy kubecontainer.GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error {
|
||||||
|
ctx, otelSpan := cgc.tracer.Start(ctx, "Containers/GarbageCollect")
|
||||||
|
defer otelSpan.End()
|
||||||
errors := []error{}
|
errors := []error{}
|
||||||
// Remove evictable containers
|
// Remove evictable containers
|
||||||
if err := cgc.evictContainers(ctx, gcPolicy, allSourcesReady, evictNonDeletedPods); err != nil {
|
if err := cgc.evictContainers(ctx, gcPolicy, allSourcesReady, evictNonDeletedPods); err != nil {
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
crierror "k8s.io/cri-api/pkg/errors"
|
crierror "k8s.io/cri-api/pkg/errors"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
@ -77,6 +78,8 @@ const (
|
|||||||
versionCacheTTL = 60 * time.Second
|
versionCacheTTL = 60 * time.Second
|
||||||
// How frequently to report identical errors
|
// How frequently to report identical errors
|
||||||
identicalErrorDelay = 1 * time.Minute
|
identicalErrorDelay = 1 * time.Minute
|
||||||
|
// OpenTelemetry instrumentation scope name
|
||||||
|
instrumentationScope = "k8s.io/kubernetes/pkg/kubelet/kuberuntime"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -205,10 +208,12 @@ func NewKubeGenericRuntimeManager(
|
|||||||
getNodeAllocatable func() v1.ResourceList,
|
getNodeAllocatable func() v1.ResourceList,
|
||||||
memoryThrottlingFactor float64,
|
memoryThrottlingFactor float64,
|
||||||
podPullingTimeRecorder images.ImagePodPullingTimeRecorder,
|
podPullingTimeRecorder images.ImagePodPullingTimeRecorder,
|
||||||
|
tracerProvider trace.TracerProvider,
|
||||||
) (KubeGenericRuntime, error) {
|
) (KubeGenericRuntime, error) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
runtimeService = newInstrumentedRuntimeService(runtimeService)
|
runtimeService = newInstrumentedRuntimeService(runtimeService)
|
||||||
imageService = newInstrumentedImageManagerService(imageService)
|
imageService = newInstrumentedImageManagerService(imageService)
|
||||||
|
tracer := tracerProvider.Tracer(instrumentationScope)
|
||||||
kubeRuntimeManager := &kubeGenericRuntimeManager{
|
kubeRuntimeManager := &kubeGenericRuntimeManager{
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
cpuCFSQuota: cpuCFSQuota,
|
cpuCFSQuota: cpuCFSQuota,
|
||||||
@ -281,7 +286,7 @@ func NewKubeGenericRuntimeManager(
|
|||||||
imagePullBurst,
|
imagePullBurst,
|
||||||
podPullingTimeRecorder)
|
podPullingTimeRecorder)
|
||||||
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(insecureContainerLifecycleHTTPClient, kubeRuntimeManager, kubeRuntimeManager, recorder)
|
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(insecureContainerLifecycleHTTPClient, kubeRuntimeManager, kubeRuntimeManager, recorder)
|
||||||
kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)
|
kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager, tracer)
|
||||||
kubeRuntimeManager.podStateProvider = podStateProvider
|
kubeRuntimeManager.podStateProvider = podStateProvider
|
||||||
|
|
||||||
kubeRuntimeManager.versionCache = cache.NewObjectCache(
|
kubeRuntimeManager.versionCache = cache.NewObjectCache(
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
oteltrace "go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
@ -69,7 +70,7 @@ func customTestRuntimeManager(keyring *credentialprovider.BasicDockerKeyring) (*
|
|||||||
MemoryCapacity: uint64(memoryCapacityQuantity.Value()),
|
MemoryCapacity: uint64(memoryCapacityQuantity.Value()),
|
||||||
}
|
}
|
||||||
osInterface := &containertest.FakeOS{}
|
osInterface := &containertest.FakeOS{}
|
||||||
manager, err := newFakeKubeRuntimeManager(fakeRuntimeService, fakeImageService, machineInfo, osInterface, &containertest.FakeRuntimeHelper{}, keyring)
|
manager, err := newFakeKubeRuntimeManager(fakeRuntimeService, fakeImageService, machineInfo, osInterface, &containertest.FakeRuntimeHelper{}, keyring, oteltrace.NewNoopTracerProvider().Tracer(""))
|
||||||
return fakeRuntimeService, fakeImageService, manager, err
|
return fakeRuntimeService, fakeImageService, manager, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user