From ab8c784ee970d72b03fd1c2ed7c228914e17e954 Mon Sep 17 00:00:00 2001 From: Harsha Narayana Date: Tue, 6 Dec 2022 11:37:50 +0530 Subject: [PATCH] kubelet: enable configurable rotation duration and parallel rotate --- pkg/generated/openapi/zz_generated.openapi.go | 13 ++ pkg/kubelet/apis/config/fuzzer/fuzzer.go | 2 + pkg/kubelet/apis/config/helpers_test.go | 2 + .../KubeletConfiguration/after/v1beta1.yaml | 2 + .../roundtrip/default/v1beta1.yaml | 2 + pkg/kubelet/apis/config/types.go | 5 + pkg/kubelet/apis/config/v1beta1/defaults.go | 6 + .../apis/config/v1beta1/defaults_test.go | 140 +++++++++-------- .../config/v1beta1/zz_generated.conversion.go | 12 ++ .../apis/config/validation/validation.go | 7 + .../apis/config/validation/validation_test.go | 25 +++- .../apis/config/zz_generated.deepcopy.go | 1 + pkg/kubelet/kubelet.go | 2 + .../kuberuntime/fake_kuberuntime_manager.go | 2 +- pkg/kubelet/logs/container_log_manager.go | 141 +++++++++++------- .../logs/container_log_manager_test.go | 41 ++++- .../k8s.io/kubelet/config/v1beta1/types.go | 15 ++ .../config/v1beta1/zz_generated.deepcopy.go | 10 ++ test/e2e_node/container_log_rotation_test.go | 95 +++++++++++- 19 files changed, 393 insertions(+), 130 deletions(-) diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index 84136e51c00..81696e7e5cd 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -58809,6 +58809,19 @@ func schema_k8sio_kubelet_config_v1beta1_KubeletConfiguration(ref common.Referen Format: "int32", }, }, + "containerLogMaxWorkers": { + SchemaProps: spec.SchemaProps{ + Description: "ContainerLogMaxWorkers specifies the maximum number of concurrent workers to spawn for performing the log rotate operations. Set this count to 1 for disabling the concurrent log rotation workflows Default: 1", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "containerLogMonitorInterval": { + SchemaProps: spec.SchemaProps{ + Description: "ContainerLogMonitorInterval specifies the duration at which the container logs are monitored for performing the log rotate operation. This defaults to 10 * time.Seconds. But can be customized to a smaller value based on the log generation rate and the size required to be rotated against Default: 10s", + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Duration"), + }, + }, "configMapAndSecretChangeDetectionStrategy": { SchemaProps: spec.SchemaProps{ Description: "configMapAndSecretChangeDetectionStrategy is a mode in which ConfigMap and Secret managers are running. Valid values include:\n\n- `Get`: kubelet fetches necessary objects directly from the API server; - `Cache`: kubelet uses TTL cache for object fetched from the API server; - `Watch`: kubelet uses watches to observe changes to objects that are in its interest.\n\nDefault: \"Watch\"", diff --git a/pkg/kubelet/apis/config/fuzzer/fuzzer.go b/pkg/kubelet/apis/config/fuzzer/fuzzer.go index dfa988c0d04..22191bbf192 100644 --- a/pkg/kubelet/apis/config/fuzzer/fuzzer.go +++ b/pkg/kubelet/apis/config/fuzzer/fuzzer.go @@ -106,6 +106,8 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} { obj.StaticPodURLHeader = make(map[string][]string) obj.ContainerLogMaxFiles = 5 obj.ContainerLogMaxSize = "10Mi" + obj.ContainerLogMaxWorkers = 1 + obj.ContainerLogMonitorInterval = metav1.Duration{Duration: 10 * time.Second} obj.ConfigMapAndSecretChangeDetectionStrategy = "Watch" obj.AllowedUnsafeSysctls = []string{} obj.VolumePluginDir = kubeletconfigv1beta1.DefaultVolumePluginDir diff --git a/pkg/kubelet/apis/config/helpers_test.go b/pkg/kubelet/apis/config/helpers_test.go index 31f406b85f2..efda747a92d 100644 --- a/pkg/kubelet/apis/config/helpers_test.go +++ b/pkg/kubelet/apis/config/helpers_test.go @@ -185,6 +185,8 @@ var ( "ConfigMapAndSecretChangeDetectionStrategy", "ContainerLogMaxFiles", "ContainerLogMaxSize", + "ContainerLogMaxWorkers", + "ContainerLogMonitorInterval", "ContentType", "EnableContentionProfiling", "EnableControllerAttachDetach", diff --git a/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/after/v1beta1.yaml b/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/after/v1beta1.yaml index def3f0dc844..fceac6179a1 100644 --- a/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/after/v1beta1.yaml +++ b/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/after/v1beta1.yaml @@ -17,6 +17,8 @@ cgroupsPerQOS: true configMapAndSecretChangeDetectionStrategy: Watch containerLogMaxFiles: 5 containerLogMaxSize: 10Mi +containerLogMaxWorkers: 1 +containerLogMonitorInterval: 10s containerRuntimeEndpoint: unix:///run/containerd/containerd.sock contentType: application/vnd.kubernetes.protobuf cpuCFSQuota: true diff --git a/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/roundtrip/default/v1beta1.yaml b/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/roundtrip/default/v1beta1.yaml index ca5cf18b983..a1e94c8974b 100644 --- a/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/roundtrip/default/v1beta1.yaml +++ b/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/roundtrip/default/v1beta1.yaml @@ -17,6 +17,8 @@ cgroupsPerQOS: true configMapAndSecretChangeDetectionStrategy: Watch containerLogMaxFiles: 5 containerLogMaxSize: 10Mi +containerLogMaxWorkers: 1 +containerLogMonitorInterval: 10s containerRuntimeEndpoint: unix:///run/containerd/containerd.sock contentType: application/vnd.kubernetes.protobuf cpuCFSQuota: true diff --git a/pkg/kubelet/apis/config/types.go b/pkg/kubelet/apis/config/types.go index a64724a58d1..42fe0fa92be 100644 --- a/pkg/kubelet/apis/config/types.go +++ b/pkg/kubelet/apis/config/types.go @@ -347,6 +347,11 @@ type KubeletConfiguration struct { ContainerLogMaxSize string // Maximum number of container log files that can be present for a container. ContainerLogMaxFiles int32 + // Maximum number of concurrent log rotation workers to spawn for processing the log rotation + // requests + ContainerLogMaxWorkers int32 + // Interval at which the container logs are monitored for rotation + ContainerLogMonitorInterval metav1.Duration // ConfigMapAndSecretChangeDetectionStrategy is a mode in which config map and secret managers are running. ConfigMapAndSecretChangeDetectionStrategy ResourceChangeDetectionStrategy // A comma separated allowlist of unsafe sysctls or sysctl patterns (ending in `*`). diff --git a/pkg/kubelet/apis/config/v1beta1/defaults.go b/pkg/kubelet/apis/config/v1beta1/defaults.go index bf52fc2396a..dccf3538078 100644 --- a/pkg/kubelet/apis/config/v1beta1/defaults.go +++ b/pkg/kubelet/apis/config/v1beta1/defaults.go @@ -239,6 +239,12 @@ func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfigura if obj.ContainerLogMaxFiles == nil { obj.ContainerLogMaxFiles = utilpointer.Int32(5) } + if obj.ContainerLogMaxWorkers == nil { + obj.ContainerLogMaxWorkers = utilpointer.Int32(1) + } + if obj.ContainerLogMonitorInterval == nil { + obj.ContainerLogMonitorInterval = &metav1.Duration{Duration: 10 * time.Second} + } if obj.ConfigMapAndSecretChangeDetectionStrategy == "" { obj.ConfigMapAndSecretChangeDetectionStrategy = kubeletconfigv1beta1.WatchChangeDetectionStrategy } diff --git a/pkg/kubelet/apis/config/v1beta1/defaults_test.go b/pkg/kubelet/apis/config/v1beta1/defaults_test.go index 55a6068e44d..ac15a2d8b0d 100644 --- a/pkg/kubelet/apis/config/v1beta1/defaults_test.go +++ b/pkg/kubelet/apis/config/v1beta1/defaults_test.go @@ -112,6 +112,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { FailSwapOn: utilpointer.Bool(true), ContainerLogMaxSize: "10Mi", ContainerLogMaxFiles: utilpointer.Int32(5), + ContainerLogMaxWorkers: utilpointer.Int32(1), + ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second}, ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy, EnforceNodeAllocatable: DefaultNodeAllocatableEnforcement, VolumePluginDir: DefaultVolumePluginDir, @@ -227,6 +229,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { MemorySwap: v1beta1.MemorySwapConfiguration{SwapBehavior: ""}, ContainerLogMaxSize: "", ContainerLogMaxFiles: utilpointer.Int32(0), + ContainerLogMaxWorkers: utilpointer.Int32(1), + ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second}, ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy, SystemReserved: map[string]string{}, KubeReserved: map[string]string{}, @@ -278,67 +282,69 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { CacheUnauthorizedTTL: metav1.Duration{Duration: 30 * time.Second}, }, }, - RegistryPullQPS: utilpointer.Int32(0), - RegistryBurst: 10, - EventRecordQPS: utilpointer.Int32(0), - EventBurst: 100, - EnableDebuggingHandlers: utilpointer.Bool(false), - HealthzPort: utilpointer.Int32(0), - HealthzBindAddress: "127.0.0.1", - OOMScoreAdj: utilpointer.Int32(0), - ClusterDNS: []string{}, - StreamingConnectionIdleTimeout: metav1.Duration{Duration: 4 * time.Hour}, - NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Second}, - NodeStatusReportFrequency: metav1.Duration{Duration: 5 * time.Minute}, - NodeLeaseDurationSeconds: 40, - ContainerRuntimeEndpoint: "unix:///run/containerd/containerd.sock", - ImageMinimumGCAge: metav1.Duration{Duration: 2 * time.Minute}, - ImageGCHighThresholdPercent: utilpointer.Int32(0), - ImageGCLowThresholdPercent: utilpointer.Int32(0), - VolumeStatsAggPeriod: metav1.Duration{Duration: time.Minute}, - CgroupsPerQOS: utilpointer.Bool(false), - CgroupDriver: "cgroupfs", - CPUManagerPolicy: "none", - CPUManagerPolicyOptions: map[string]string{}, - CPUManagerReconcilePeriod: metav1.Duration{Duration: 10 * time.Second}, - MemoryManagerPolicy: v1beta1.NoneMemoryManagerPolicy, - TopologyManagerPolicy: v1beta1.NoneTopologyManagerPolicy, - TopologyManagerScope: v1beta1.ContainerTopologyManagerScope, - QOSReserved: map[string]string{}, - RuntimeRequestTimeout: metav1.Duration{Duration: 2 * time.Minute}, - HairpinMode: v1beta1.PromiscuousBridge, - MaxPods: 110, - PodPidsLimit: utilpointer.Int64(0), - ResolverConfig: utilpointer.String(""), - CPUCFSQuota: utilpointer.Bool(false), - CPUCFSQuotaPeriod: &zeroDuration, - NodeStatusMaxImages: utilpointer.Int32(0), - MaxOpenFiles: 1000000, - ContentType: "application/vnd.kubernetes.protobuf", - KubeAPIQPS: utilpointer.Int32(0), - KubeAPIBurst: 100, - SerializeImagePulls: utilpointer.Bool(false), - MaxParallelImagePulls: nil, - EvictionHard: map[string]string{}, - EvictionSoft: map[string]string{}, - EvictionSoftGracePeriod: map[string]string{}, - EvictionPressureTransitionPeriod: metav1.Duration{Duration: 5 * time.Minute}, - EvictionMinimumReclaim: map[string]string{}, - EnableControllerAttachDetach: utilpointer.Bool(false), - MakeIPTablesUtilChains: utilpointer.Bool(false), - IPTablesMasqueradeBit: utilpointer.Int32(0), - IPTablesDropBit: utilpointer.Int32(0), - FeatureGates: map[string]bool{}, - FailSwapOn: utilpointer.Bool(false), - MemorySwap: v1beta1.MemorySwapConfiguration{SwapBehavior: ""}, - ContainerLogMaxSize: "10Mi", - ContainerLogMaxFiles: utilpointer.Int32(0), + RegistryPullQPS: utilpointer.Int32(0), + RegistryBurst: 10, + EventRecordQPS: utilpointer.Int32(0), + EventBurst: 100, + EnableDebuggingHandlers: utilpointer.Bool(false), + HealthzPort: utilpointer.Int32(0), + HealthzBindAddress: "127.0.0.1", + OOMScoreAdj: utilpointer.Int32(0), + ClusterDNS: []string{}, + StreamingConnectionIdleTimeout: metav1.Duration{Duration: 4 * time.Hour}, + NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Second}, + NodeStatusReportFrequency: metav1.Duration{Duration: 5 * time.Minute}, + NodeLeaseDurationSeconds: 40, + ContainerRuntimeEndpoint: "unix:///run/containerd/containerd.sock", + ImageMinimumGCAge: metav1.Duration{Duration: 2 * time.Minute}, + ImageGCHighThresholdPercent: utilpointer.Int32(0), + ImageGCLowThresholdPercent: utilpointer.Int32(0), + VolumeStatsAggPeriod: metav1.Duration{Duration: time.Minute}, + CgroupsPerQOS: utilpointer.Bool(false), + CgroupDriver: "cgroupfs", + CPUManagerPolicy: "none", + CPUManagerPolicyOptions: map[string]string{}, + CPUManagerReconcilePeriod: metav1.Duration{Duration: 10 * time.Second}, + MemoryManagerPolicy: v1beta1.NoneMemoryManagerPolicy, + TopologyManagerPolicy: v1beta1.NoneTopologyManagerPolicy, + TopologyManagerScope: v1beta1.ContainerTopologyManagerScope, + QOSReserved: map[string]string{}, + RuntimeRequestTimeout: metav1.Duration{Duration: 2 * time.Minute}, + HairpinMode: v1beta1.PromiscuousBridge, + MaxPods: 110, + PodPidsLimit: utilpointer.Int64(0), + ResolverConfig: utilpointer.String(""), + CPUCFSQuota: utilpointer.Bool(false), + CPUCFSQuotaPeriod: &zeroDuration, + NodeStatusMaxImages: utilpointer.Int32(0), + MaxOpenFiles: 1000000, + ContentType: "application/vnd.kubernetes.protobuf", + KubeAPIQPS: utilpointer.Int32(0), + KubeAPIBurst: 100, + SerializeImagePulls: utilpointer.Bool(false), + MaxParallelImagePulls: nil, + EvictionHard: map[string]string{}, + EvictionSoft: map[string]string{}, + EvictionSoftGracePeriod: map[string]string{}, + EvictionPressureTransitionPeriod: metav1.Duration{Duration: 5 * time.Minute}, + EvictionMinimumReclaim: map[string]string{}, + EnableControllerAttachDetach: utilpointer.Bool(false), + MakeIPTablesUtilChains: utilpointer.Bool(false), + IPTablesMasqueradeBit: utilpointer.Int32(0), + IPTablesDropBit: utilpointer.Int32(0), + FeatureGates: map[string]bool{}, + FailSwapOn: utilpointer.Bool(false), + MemorySwap: v1beta1.MemorySwapConfiguration{SwapBehavior: ""}, + ContainerLogMaxSize: "10Mi", + ContainerLogMaxFiles: utilpointer.Int32(0), + ContainerLogMaxWorkers: utilpointer.Int32(1), + ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second}, ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy, - SystemReserved: map[string]string{}, - KubeReserved: map[string]string{}, - EnforceNodeAllocatable: []string{}, - AllowedUnsafeSysctls: []string{}, - VolumePluginDir: DefaultVolumePluginDir, + SystemReserved: map[string]string{}, + KubeReserved: map[string]string{}, + EnforceNodeAllocatable: []string{}, + AllowedUnsafeSysctls: []string{}, + VolumePluginDir: DefaultVolumePluginDir, Logging: logsapi.LoggingConfiguration{ Format: "text", FlushFrequency: logsapi.TimeOrMetaDuration{Duration: metav1.Duration{Duration: 5 * time.Second}, SerializeAsString: true}, @@ -465,6 +471,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { MemorySwap: v1beta1.MemorySwapConfiguration{SwapBehavior: "UnlimitedSwap"}, ContainerLogMaxSize: "1Mi", ContainerLogMaxFiles: utilpointer.Int32(1), + ContainerLogMaxWorkers: utilpointer.Int32(1), + ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second}, ConfigMapAndSecretChangeDetectionStrategy: v1beta1.TTLCacheChangeDetectionStrategy, SystemReserved: map[string]string{ "memory": "1Gi", @@ -611,6 +619,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { MemorySwap: v1beta1.MemorySwapConfiguration{SwapBehavior: "UnlimitedSwap"}, ContainerLogMaxSize: "1Mi", ContainerLogMaxFiles: utilpointer.Int32(1), + ContainerLogMaxWorkers: utilpointer.Int32(1), + ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second}, ConfigMapAndSecretChangeDetectionStrategy: v1beta1.TTLCacheChangeDetectionStrategy, SystemReserved: map[string]string{ "memory": "1Gi", @@ -720,7 +730,9 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { IPTablesDropBit: utilpointer.Int32Ptr(DefaultIPTablesDropBit), FailSwapOn: utilpointer.Bool(true), ContainerLogMaxSize: "10Mi", - ContainerLogMaxFiles: utilpointer.Int32Ptr(5), + ContainerLogMaxFiles: utilpointer.Int32(5), + ContainerLogMaxWorkers: utilpointer.Int32(1), + ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second}, ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy, EnforceNodeAllocatable: DefaultNodeAllocatableEnforcement, VolumePluginDir: DefaultVolumePluginDir, @@ -809,7 +821,9 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { IPTablesDropBit: utilpointer.Int32Ptr(DefaultIPTablesDropBit), FailSwapOn: utilpointer.Bool(true), ContainerLogMaxSize: "10Mi", - ContainerLogMaxFiles: utilpointer.Int32Ptr(5), + ContainerLogMaxFiles: utilpointer.Int32(5), + ContainerLogMaxWorkers: utilpointer.Int32(1), + ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second}, ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy, EnforceNodeAllocatable: DefaultNodeAllocatableEnforcement, VolumePluginDir: DefaultVolumePluginDir, @@ -899,6 +913,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) { FailSwapOn: utilpointer.Bool(true), ContainerLogMaxSize: "10Mi", ContainerLogMaxFiles: utilpointer.Int32(5), + ContainerLogMaxWorkers: utilpointer.Int32(1), + ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second}, ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy, EnforceNodeAllocatable: DefaultNodeAllocatableEnforcement, VolumePluginDir: DefaultVolumePluginDir, diff --git a/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go b/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go index 3704be3e1a1..0befe8c1a74 100644 --- a/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go +++ b/pkg/kubelet/apis/config/v1beta1/zz_generated.conversion.go @@ -476,6 +476,12 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in if err := v1.Convert_Pointer_int32_To_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil { return err } + if err := v1.Convert_Pointer_int32_To_int32(&in.ContainerLogMaxWorkers, &out.ContainerLogMaxWorkers, s); err != nil { + return err + } + if err := v1.Convert_Pointer_v1_Duration_To_v1_Duration(&in.ContainerLogMonitorInterval, &out.ContainerLogMonitorInterval, s); err != nil { + return err + } out.ConfigMapAndSecretChangeDetectionStrategy = config.ResourceChangeDetectionStrategy(in.ConfigMapAndSecretChangeDetectionStrategy) out.SystemReserved = *(*map[string]string)(unsafe.Pointer(&in.SystemReserved)) out.KubeReserved = *(*map[string]string)(unsafe.Pointer(&in.KubeReserved)) @@ -664,6 +670,12 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in if err := v1.Convert_int32_To_Pointer_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil { return err } + if err := v1.Convert_int32_To_Pointer_int32(&in.ContainerLogMaxWorkers, &out.ContainerLogMaxWorkers, s); err != nil { + return err + } + if err := v1.Convert_v1_Duration_To_Pointer_v1_Duration(&in.ContainerLogMonitorInterval, &out.ContainerLogMonitorInterval, s); err != nil { + return err + } out.ConfigMapAndSecretChangeDetectionStrategy = v1beta1.ResourceChangeDetectionStrategy(in.ConfigMapAndSecretChangeDetectionStrategy) out.AllowedUnsafeSysctls = *(*[]string)(unsafe.Pointer(&in.AllowedUnsafeSysctls)) out.KernelMemcgNotification = in.KernelMemcgNotification diff --git a/pkg/kubelet/apis/config/validation/validation.go b/pkg/kubelet/apis/config/validation/validation.go index dd46a477cfd..9db7e17f891 100644 --- a/pkg/kubelet/apis/config/validation/validation.go +++ b/pkg/kubelet/apis/config/validation/validation.go @@ -279,5 +279,12 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration, featur fmt.Errorf("invalid configuration: enableSystemLogHandler is required for enableSystemLogQuery")) } + if kc.ContainerLogMaxWorkers < 1 { + allErrors = append(allErrors, fmt.Errorf("invalid configuration: containerLogMaxWorkers must be greater than or equal to 1")) + } + + if kc.ContainerLogMonitorInterval.Duration.Seconds() < 3 { + allErrors = append(allErrors, fmt.Errorf("invalid configuration: containerLogMonitorInterval must be a positive time duration greater than or equal to 3s")) + } return utilerrors.NewAggregate(allErrors) } diff --git a/pkg/kubelet/apis/config/validation/validation_test.go b/pkg/kubelet/apis/config/validation/validation_test.go index 913c5dc3bdf..e50fd8741b5 100644 --- a/pkg/kubelet/apis/config/validation/validation_test.go +++ b/pkg/kubelet/apis/config/validation/validation_test.go @@ -74,7 +74,9 @@ var ( Logging: logsapi.LoggingConfiguration{ Format: "text", }, - ContainerRuntimeEndpoint: "unix:///run/containerd/containerd.sock", + ContainerRuntimeEndpoint: "unix:///run/containerd/containerd.sock", + ContainerLogMaxWorkers: 1, + ContainerLogMonitorInterval: metav1.Duration{Duration: 10 * time.Second}, } ) @@ -545,6 +547,27 @@ func TestValidateKubeletConfiguration(t *testing.T) { return conf }, errMsg: "invalid configuration: imageMaximumGCAge 1ns must be greater than imageMinimumGCAge 2ns", + }, { + name: "containerLogMaxWorkers must be greater than or equal to 1", + configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration { + conf.ContainerLogMaxWorkers = 0 + return conf + }, + errMsg: "invalid configuration: containerLogMaxWorkers must be greater than or equal to 1", + }, { + name: "containerLogMonitorInterval must be a positive time duration", + configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration { + conf.ContainerLogMonitorInterval = metav1.Duration{Duration: -1 * time.Second} + return conf + }, + errMsg: "invalid configuration: containerLogMonitorInterval must be a positive time duration greater than or equal to 3s", + }, { + name: "containerLogMonitorInterval must be at least 3s or higher", + configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration { + conf.ContainerLogMonitorInterval = metav1.Duration{Duration: 2 * time.Second} + return conf + }, + errMsg: "invalid configuration: containerLogMonitorInterval must be a positive time duration greater than or equal to 3s", }} for _, tc := range cases { diff --git a/pkg/kubelet/apis/config/zz_generated.deepcopy.go b/pkg/kubelet/apis/config/zz_generated.deepcopy.go index e2c0cc1dd1e..dc2df3bcee4 100644 --- a/pkg/kubelet/apis/config/zz_generated.deepcopy.go +++ b/pkg/kubelet/apis/config/zz_generated.deepcopy.go @@ -270,6 +270,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) { } } out.MemorySwap = in.MemorySwap + out.ContainerLogMonitorInterval = in.ContainerLogMonitorInterval if in.AllowedUnsafeSysctls != nil { in, out := &in.AllowedUnsafeSysctls, &out.AllowedUnsafeSysctls *out = make([]string, len(*in)) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 1d412181500..dbf8f554c07 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -623,6 +623,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps.OSInterface, kubeCfg.ContainerLogMaxSize, int(kubeCfg.ContainerLogMaxFiles), + int(kubeCfg.ContainerLogMaxWorkers), + kubeCfg.ContainerLogMonitorInterval, ) if err != nil { return nil, fmt.Errorf("failed to initialize container log manager: %v", err) diff --git a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go index 2f6ef8e6977..f8ac208f937 100644 --- a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go @@ -94,7 +94,7 @@ func (f *fakePodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring, tracer trace.Tracer) (*kubeGenericRuntimeManager, error) { ctx := context.Background() recorder := &record.FakeRecorder{} - logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2) + logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2, 10, metav1.Duration{Duration: 10 * time.Second}) if err != nil { return nil, err } diff --git a/pkg/kubelet/logs/container_log_manager.go b/pkg/kubelet/logs/container_log_manager.go index b3d6b063160..aff93d97e95 100644 --- a/pkg/kubelet/logs/container_log_manager.go +++ b/pkg/kubelet/logs/container_log_manager.go @@ -26,11 +26,12 @@ import ( "sort" "strings" "sync" - "time" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" internalapi "k8s.io/cri-api/pkg/apis" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -39,9 +40,6 @@ import ( ) const ( - // logMonitorPeriod is the period container log manager monitors - // container logs and performs log rotation. - logMonitorPeriod = 10 * time.Second // timestampFormat is format of the timestamp suffix for rotated log. // See https://golang.org/pkg/time/#Time.Format. timestampFormat = "20060102-150405" @@ -143,15 +141,18 @@ func parseMaxSize(size string) (int64, error) { } type containerLogManager struct { - runtimeService internalapi.RuntimeService - osInterface kubecontainer.OSInterface - policy LogRotatePolicy - clock clock.Clock - mutex sync.Mutex + runtimeService internalapi.RuntimeService + osInterface kubecontainer.OSInterface + policy LogRotatePolicy + clock clock.Clock + mutex sync.Mutex + queue workqueue.RateLimitingInterface + maxWorkers int + monitoringPeriod metav1.Duration } // NewContainerLogManager creates a new container log manager. -func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterface kubecontainer.OSInterface, maxSize string, maxFiles int) (ContainerLogManager, error) { +func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterface kubecontainer.OSInterface, maxSize string, maxFiles int, maxWorkers int, monitorInterval metav1.Duration) (ContainerLogManager, error) { if maxFiles <= 1 { return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles) } @@ -171,20 +172,28 @@ func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterfa MaxSize: parsedMaxSize, MaxFiles: maxFiles, }, - clock: clock.RealClock{}, - mutex: sync.Mutex{}, + clock: clock.RealClock{}, + mutex: sync.Mutex{}, + maxWorkers: maxWorkers, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"), + monitoringPeriod: monitorInterval, }, nil } // Start the container log manager. func (c *containerLogManager) Start() { ctx := context.Background() + klog.InfoS("Initializing container log rotate workers", "workers", c.maxWorkers, "monitorPeriod", c.monitoringPeriod) + for i := 0; i < c.maxWorkers; i++ { + worker := i + 1 + go c.processQueueItems(ctx, worker) + } // Start a goroutine periodically does container log rotation. go wait.Forever(func() { if err := c.rotateLogs(ctx); err != nil { klog.ErrorS(err, "Failed to rotate container logs") } - }, logMonitorPeriod) + }, c.monitoringPeriod.Duration) } // Clean removes all logs of specified container (including rotated one). @@ -213,65 +222,91 @@ func (c *containerLogManager) Clean(ctx context.Context, containerID string) err return nil } +func (c *containerLogManager) processQueueItems(ctx context.Context, worker int) { + klog.V(4).InfoS("Starting container log rotation worker", "workerID", worker) + for c.processContainer(ctx, worker) { + } + klog.V(4).InfoS("Terminating container log rotation worker", "workerID", worker) +} + func (c *containerLogManager) rotateLogs(ctx context.Context) error { c.mutex.Lock() defer c.mutex.Unlock() + klog.V(4).InfoS("Starting container log rotation sequence") // TODO(#59998): Use kubelet pod cache. containers, err := c.runtimeService.ListContainers(ctx, &runtimeapi.ContainerFilter{}) if err != nil { return fmt.Errorf("failed to list containers: %v", err) } - // NOTE(random-liu): Figure out whether we need to rotate container logs in parallel. for _, container := range containers { // Only rotate logs for running containers. Non-running containers won't // generate new output, it doesn't make sense to keep an empty latest log. if container.GetState() != runtimeapi.ContainerState_CONTAINER_RUNNING { continue } - id := container.GetId() - // Note that we should not block log rotate for an error of a single container. - resp, err := c.runtimeService.ContainerStatus(ctx, id, false) - if err != nil { - klog.ErrorS(err, "Failed to get container status", "containerID", id) - continue - } - if resp.GetStatus() == nil { - klog.ErrorS(err, "Container status is nil", "containerID", id) - continue - } - path := resp.GetStatus().GetLogPath() - info, err := c.osInterface.Stat(path) - if err != nil { - if !os.IsNotExist(err) { - klog.ErrorS(err, "Failed to stat container log", "path", path) - continue - } - // In rotateLatestLog, there are several cases that we may - // lose original container log after ReopenContainerLog fails. - // We try to recover it by reopening container log. - if err := c.runtimeService.ReopenContainerLog(ctx, id); err != nil { - klog.ErrorS(err, "Container log doesn't exist, reopen container log failed", "containerID", id, "path", path) - continue - } - // The container log should be recovered. - info, err = c.osInterface.Stat(path) - if err != nil { - klog.ErrorS(err, "Failed to stat container log after reopen", "path", path) - continue - } - } - if info.Size() < c.policy.MaxSize { - continue - } - // Perform log rotation. - if err := c.rotateLog(ctx, id, path); err != nil { - klog.ErrorS(err, "Failed to rotate log for container", "path", path, "containerID", id) - continue + // Doing this to avoid additional overhead with logging of label like arguments that can prove costly + if v := klog.V(4); v.Enabled() { + klog.V(4).InfoS("Adding new entry to the queue for processing", "id", container.GetId(), "name", container.Metadata.GetName(), "labels", container.GetLabels()) } + c.queue.Add(container.GetId()) } return nil } +func (c *containerLogManager) processContainer(ctx context.Context, worker int) (ok bool) { + key, quit := c.queue.Get() + if quit { + return false + } + defer func() { + c.queue.Done(key) + c.queue.Forget(key) + }() + // Always default the return to true to keep the processing of Queue ongoing + ok = true + id := key.(string) + + resp, err := c.runtimeService.ContainerStatus(ctx, id, false) + if err != nil { + klog.ErrorS(err, "Failed to get container status", "worker", worker, "containerID", id) + return + } + if resp.GetStatus() == nil { + klog.ErrorS(err, "Container status is nil", "worker", worker, "containerID", id) + return + } + path := resp.GetStatus().GetLogPath() + info, err := c.osInterface.Stat(path) + + if err != nil { + if !os.IsNotExist(err) { + klog.ErrorS(err, "Failed to stat container log", "worker", worker, "containerID", id, "path", path) + return + } + + if err = c.runtimeService.ReopenContainerLog(ctx, id); err != nil { + klog.ErrorS(err, "Container log doesn't exist, reopen container log failed", "worker", worker, "containerID", id, "path", path) + return + } + + info, err = c.osInterface.Stat(path) + if err != nil { + klog.ErrorS(err, "Failed to stat container log after reopen", "worker", worker, "containerID", id, "path", path) + return + } + } + if info.Size() < c.policy.MaxSize { + klog.V(7).InfoS("log file doesn't need to be rotated", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize) + return + } + + if err := c.rotateLog(ctx, id, path); err != nil { + klog.ErrorS(err, "Failed to rotate log for container", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize) + return + } + return +} + func (c *containerLogManager) rotateLog(ctx context.Context, id, log string) error { // pattern is used to match all rotated files. pattern := fmt.Sprintf("%s.*", log) diff --git a/pkg/kubelet/logs/container_log_manager_test.go b/pkg/kubelet/logs/container_log_manager_test.go index 59fc3da073c..7d6e7055274 100644 --- a/pkg/kubelet/logs/container_log_manager_test.go +++ b/pkg/kubelet/logs/container_log_manager_test.go @@ -23,11 +23,15 @@ import ( "io" "os" "path/filepath" + "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/kubelet/container" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -92,8 +96,12 @@ func TestRotateLogs(t *testing.T) { MaxSize: testMaxSize, MaxFiles: testMaxFiles, }, - osInterface: container.RealOS{}, - clock: testingclock.NewFakeClock(now), + osInterface: container.RealOS{}, + clock: testingclock.NewFakeClock(now), + mutex: sync.Mutex{}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"), + maxWorkers: 10, + monitoringPeriod: v1.Duration{Duration: 10 * time.Second}, } testLogs := []string{ "test-log-1", @@ -149,8 +157,16 @@ func TestRotateLogs(t *testing.T) { }, } f.SetFakeContainers(testContainers) + go c.processQueueItems(ctx, 1) require.NoError(t, c.rotateLogs(ctx)) + pollTimeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + err = wait.PollUntilContextCancel(pollTimeoutCtx, 20*time.Millisecond, false, func(ctx context.Context) (done bool, err error) { + return c.queue.Len() == 0, nil + }) + require.NoError(t, err) + timestamp := now.Format(timestampFormat) logs, err := os.ReadDir(dir) require.NoError(t, err) @@ -160,6 +176,7 @@ func TestRotateLogs(t *testing.T) { assert.Equal(t, testLogs[4]+compressSuffix, logs[2].Name()) assert.Equal(t, testLogs[2]+"."+timestamp, logs[3].Name()) assert.Equal(t, testLogs[3], logs[4].Name()) + c.queue.ShutDown() } func TestClean(t *testing.T) { @@ -180,8 +197,12 @@ func TestClean(t *testing.T) { MaxSize: testMaxSize, MaxFiles: testMaxFiles, }, - osInterface: container.RealOS{}, - clock: testingclock.NewFakeClock(now), + osInterface: container.RealOS{}, + clock: testingclock.NewFakeClock(now), + mutex: sync.Mutex{}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"), + maxWorkers: 10, + monitoringPeriod: v1.Duration{Duration: 10 * time.Second}, } testLogs := []string{ "test-log-1", @@ -383,10 +404,14 @@ func TestRotateLatestLog(t *testing.T) { now := time.Now() f := critest.NewFakeRuntimeService() c := &containerLogManager{ - runtimeService: f, - policy: LogRotatePolicy{MaxFiles: test.maxFiles}, - osInterface: container.RealOS{}, - clock: testingclock.NewFakeClock(now), + runtimeService: f, + policy: LogRotatePolicy{MaxFiles: test.maxFiles}, + osInterface: container.RealOS{}, + clock: testingclock.NewFakeClock(now), + mutex: sync.Mutex{}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"), + maxWorkers: 10, + monitoringPeriod: v1.Duration{Duration: 10 * time.Second}, } if test.runtimeError != nil { f.InjectError("ReopenContainerLog", test.runtimeError) diff --git a/staging/src/k8s.io/kubelet/config/v1beta1/types.go b/staging/src/k8s.io/kubelet/config/v1beta1/types.go index fd1439e9ebf..da8c938d479 100644 --- a/staging/src/k8s.io/kubelet/config/v1beta1/types.go +++ b/staging/src/k8s.io/kubelet/config/v1beta1/types.go @@ -595,6 +595,21 @@ type KubeletConfiguration struct { // Default: 5 // +optional ContainerLogMaxFiles *int32 `json:"containerLogMaxFiles,omitempty"` + + // ContainerLogMaxWorkers specifies the maximum number of concurrent workers to spawn + // for performing the log rotate operations. Set this count to 1 for disabling the + // concurrent log rotation workflows + // Default: 1 + // +optional + ContainerLogMaxWorkers *int32 `json:"containerLogMaxWorkers,omitempty"` + + // ContainerLogMonitorInterval specifies the duration at which the container logs are monitored + // for performing the log rotate operation. This defaults to 10 * time.Seconds. But can be + // customized to a smaller value based on the log generation rate and the size required to be + // rotated against + // Default: 10s + // +optional + ContainerLogMonitorInterval *metav1.Duration `json:"containerLogMonitorInterval,omitempty"` // configMapAndSecretChangeDetectionStrategy is a mode in which ConfigMap and Secret // managers are running. Valid values include: // diff --git a/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go b/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go index ff653a9923c..0e20d63ed4d 100644 --- a/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kubelet/config/v1beta1/zz_generated.deepcopy.go @@ -384,6 +384,16 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) { *out = new(int32) **out = **in } + if in.ContainerLogMaxWorkers != nil { + in, out := &in.ContainerLogMaxWorkers, &out.ContainerLogMaxWorkers + *out = new(int32) + **out = **in + } + if in.ContainerLogMonitorInterval != nil { + in, out := &in.ContainerLogMonitorInterval, &out.ContainerLogMonitorInterval + *out = new(v1.Duration) + **out = **in + } if in.SystemReserved != nil { in, out := &in.SystemReserved, &out.SystemReserved *out = make(map[string]string, len(*in)) diff --git a/test/e2e_node/container_log_rotation_test.go b/test/e2e_node/container_log_rotation_test.go index 5f83247ed0a..1912777b22f 100644 --- a/test/e2e_node/container_log_rotation_test.go +++ b/test/e2e_node/container_log_rotation_test.go @@ -34,11 +34,13 @@ import ( ) const ( - testContainerLogMaxFiles = 3 - testContainerLogMaxSize = "40Ki" - rotationPollInterval = 5 * time.Second - rotationEventuallyTimeout = 3 * time.Minute - rotationConsistentlyTimeout = 2 * time.Minute + testContainerLogMaxFiles = 3 + testContainerLogMaxSize = "40Ki" + testContainerLogMaxWorkers = 2 + testContainerLogMonitorInterval = 2 * time.Second + rotationPollInterval = 5 * time.Second + rotationEventuallyTimeout = 3 * time.Minute + rotationConsistentlyTimeout = 2 * time.Minute ) var _ = SIGDescribe("ContainerLogRotation", framework.WithSlow(), framework.WithSerial(), framework.WithDisruptive(), func() { @@ -106,3 +108,86 @@ var _ = SIGDescribe("ContainerLogRotation", framework.WithSlow(), framework.With }) }) }) + +var _ = SIGDescribe("ContainerLogRotationWithMultipleWorkers", framework.WithSlow(), framework.WithSerial(), framework.WithDisruptive(), func() { + f := framework.NewDefaultFramework("container-log-rotation-test-multi-worker") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + ginkgo.Context("when a container generates a lot of logs", func() { + tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) { + initialConfig.ContainerLogMaxFiles = testContainerLogMaxFiles + initialConfig.ContainerLogMaxSize = testContainerLogMaxSize + initialConfig.ContainerLogMaxWorkers = testContainerLogMaxWorkers + initialConfig.ContainerLogMonitorInterval = metav1.Duration{Duration: testContainerLogMonitorInterval} + }) + + var logRotationPods []*v1.Pod + ginkgo.BeforeEach(func(ctx context.Context) { + ginkgo.By("create log container 1") + for _, name := range []string{"test-container-log-rotation", "test-container-log-rotation-1"} { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Name: "log-container", + Image: busyboxImage, + Command: []string{ + "sh", + "-c", + // ~12Kb/s. Exceeding 40Kb in 4 seconds. Log rotation period is 10 seconds. + "while true; do echo hello world; sleep 0.001; done;", + }, + }, + }, + }, + } + logRotationPod := e2epod.NewPodClient(f).CreateSync(ctx, pod) + logRotationPods = append(logRotationPods, logRotationPod) + ginkgo.DeferCleanup(e2epod.NewPodClient(f).DeleteSync, logRotationPod.Name, metav1.DeleteOptions{}, time.Minute) + } + }) + + ginkgo.It("should be rotated and limited to a fixed amount of files", func(ctx context.Context) { + ginkgo.By("get container log path") + var logPaths []string + for _, pod := range logRotationPods { + gomega.Expect(pod.Status.ContainerStatuses).To(gomega.HaveLen(1), "log rotation pod should have one container") + id := kubecontainer.ParseContainerID(pod.Status.ContainerStatuses[0].ContainerID).ID + r, _, err := getCRIClient() + framework.ExpectNoError(err, "should connect to CRI and obtain runtime service clients and image service client") + resp, err := r.ContainerStatus(context.Background(), id, false) + framework.ExpectNoError(err) + logPaths = append(logPaths, resp.GetStatus().GetLogPath()) + } + + ginkgo.By("wait for container log being rotated to max file limit") + gomega.Eventually(ctx, func() (int, error) { + var logFiles []string + for _, logPath := range logPaths { + logs, err := kubelogs.GetAllLogs(logPath) + if err != nil { + return 0, err + } + logFiles = append(logFiles, logs...) + } + return len(logFiles), nil + }, rotationEventuallyTimeout, rotationPollInterval).Should(gomega.Equal(testContainerLogMaxFiles*2), "should eventually rotate to max file limit") + ginkgo.By("make sure container log number won't exceed max file limit") + + gomega.Consistently(ctx, func() (int, error) { + var logFiles []string + for _, logPath := range logPaths { + logs, err := kubelogs.GetAllLogs(logPath) + if err != nil { + return 0, err + } + logFiles = append(logFiles, logs...) + } + return len(logFiles), nil + }, rotationConsistentlyTimeout, rotationPollInterval).Should(gomega.BeNumerically("<=", testContainerLogMaxFiles*2), "should never exceed max file limit") + }) + }) +})