kubelet: enable configurable rotation duration and parallel rotate

This commit is contained in:
Harsha Narayana 2022-12-06 11:37:50 +05:30
parent f99638d315
commit ab8c784ee9
No known key found for this signature in database
GPG Key ID: 2246EFB056217292
19 changed files with 393 additions and 130 deletions

View File

@ -58809,6 +58809,19 @@ func schema_k8sio_kubelet_config_v1beta1_KubeletConfiguration(ref common.Referen
Format: "int32",
},
},
"containerLogMaxWorkers": {
SchemaProps: spec.SchemaProps{
Description: "ContainerLogMaxWorkers specifies the maximum number of concurrent workers to spawn for performing the log rotate operations. Set this count to 1 for disabling the concurrent log rotation workflows Default: 1",
Type: []string{"integer"},
Format: "int32",
},
},
"containerLogMonitorInterval": {
SchemaProps: spec.SchemaProps{
Description: "ContainerLogMonitorInterval specifies the duration at which the container logs are monitored for performing the log rotate operation. This defaults to 10 * time.Seconds. But can be customized to a smaller value based on the log generation rate and the size required to be rotated against Default: 10s",
Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Duration"),
},
},
"configMapAndSecretChangeDetectionStrategy": {
SchemaProps: spec.SchemaProps{
Description: "configMapAndSecretChangeDetectionStrategy is a mode in which ConfigMap and Secret managers are running. Valid values include:\n\n- `Get`: kubelet fetches necessary objects directly from the API server; - `Cache`: kubelet uses TTL cache for object fetched from the API server; - `Watch`: kubelet uses watches to observe changes to objects that are in its interest.\n\nDefault: \"Watch\"",

View File

@ -106,6 +106,8 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} {
obj.StaticPodURLHeader = make(map[string][]string)
obj.ContainerLogMaxFiles = 5
obj.ContainerLogMaxSize = "10Mi"
obj.ContainerLogMaxWorkers = 1
obj.ContainerLogMonitorInterval = metav1.Duration{Duration: 10 * time.Second}
obj.ConfigMapAndSecretChangeDetectionStrategy = "Watch"
obj.AllowedUnsafeSysctls = []string{}
obj.VolumePluginDir = kubeletconfigv1beta1.DefaultVolumePluginDir

View File

@ -185,6 +185,8 @@ var (
"ConfigMapAndSecretChangeDetectionStrategy",
"ContainerLogMaxFiles",
"ContainerLogMaxSize",
"ContainerLogMaxWorkers",
"ContainerLogMonitorInterval",
"ContentType",
"EnableContentionProfiling",
"EnableControllerAttachDetach",

View File

@ -17,6 +17,8 @@ cgroupsPerQOS: true
configMapAndSecretChangeDetectionStrategy: Watch
containerLogMaxFiles: 5
containerLogMaxSize: 10Mi
containerLogMaxWorkers: 1
containerLogMonitorInterval: 10s
containerRuntimeEndpoint: unix:///run/containerd/containerd.sock
contentType: application/vnd.kubernetes.protobuf
cpuCFSQuota: true

View File

@ -17,6 +17,8 @@ cgroupsPerQOS: true
configMapAndSecretChangeDetectionStrategy: Watch
containerLogMaxFiles: 5
containerLogMaxSize: 10Mi
containerLogMaxWorkers: 1
containerLogMonitorInterval: 10s
containerRuntimeEndpoint: unix:///run/containerd/containerd.sock
contentType: application/vnd.kubernetes.protobuf
cpuCFSQuota: true

View File

@ -347,6 +347,11 @@ type KubeletConfiguration struct {
ContainerLogMaxSize string
// Maximum number of container log files that can be present for a container.
ContainerLogMaxFiles int32
// Maximum number of concurrent log rotation workers to spawn for processing the log rotation
// requests
ContainerLogMaxWorkers int32
// Interval at which the container logs are monitored for rotation
ContainerLogMonitorInterval metav1.Duration
// ConfigMapAndSecretChangeDetectionStrategy is a mode in which config map and secret managers are running.
ConfigMapAndSecretChangeDetectionStrategy ResourceChangeDetectionStrategy
// A comma separated allowlist of unsafe sysctls or sysctl patterns (ending in `*`).

View File

@ -239,6 +239,12 @@ func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfigura
if obj.ContainerLogMaxFiles == nil {
obj.ContainerLogMaxFiles = utilpointer.Int32(5)
}
if obj.ContainerLogMaxWorkers == nil {
obj.ContainerLogMaxWorkers = utilpointer.Int32(1)
}
if obj.ContainerLogMonitorInterval == nil {
obj.ContainerLogMonitorInterval = &metav1.Duration{Duration: 10 * time.Second}
}
if obj.ConfigMapAndSecretChangeDetectionStrategy == "" {
obj.ConfigMapAndSecretChangeDetectionStrategy = kubeletconfigv1beta1.WatchChangeDetectionStrategy
}

View File

@ -112,6 +112,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
FailSwapOn: utilpointer.Bool(true),
ContainerLogMaxSize: "10Mi",
ContainerLogMaxFiles: utilpointer.Int32(5),
ContainerLogMaxWorkers: utilpointer.Int32(1),
ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second},
ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
EnforceNodeAllocatable: DefaultNodeAllocatableEnforcement,
VolumePluginDir: DefaultVolumePluginDir,
@ -227,6 +229,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
MemorySwap: v1beta1.MemorySwapConfiguration{SwapBehavior: ""},
ContainerLogMaxSize: "",
ContainerLogMaxFiles: utilpointer.Int32(0),
ContainerLogMaxWorkers: utilpointer.Int32(1),
ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second},
ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
SystemReserved: map[string]string{},
KubeReserved: map[string]string{},
@ -278,67 +282,69 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
CacheUnauthorizedTTL: metav1.Duration{Duration: 30 * time.Second},
},
},
RegistryPullQPS: utilpointer.Int32(0),
RegistryBurst: 10,
EventRecordQPS: utilpointer.Int32(0),
EventBurst: 100,
EnableDebuggingHandlers: utilpointer.Bool(false),
HealthzPort: utilpointer.Int32(0),
HealthzBindAddress: "127.0.0.1",
OOMScoreAdj: utilpointer.Int32(0),
ClusterDNS: []string{},
StreamingConnectionIdleTimeout: metav1.Duration{Duration: 4 * time.Hour},
NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Second},
NodeStatusReportFrequency: metav1.Duration{Duration: 5 * time.Minute},
NodeLeaseDurationSeconds: 40,
ContainerRuntimeEndpoint: "unix:///run/containerd/containerd.sock",
ImageMinimumGCAge: metav1.Duration{Duration: 2 * time.Minute},
ImageGCHighThresholdPercent: utilpointer.Int32(0),
ImageGCLowThresholdPercent: utilpointer.Int32(0),
VolumeStatsAggPeriod: metav1.Duration{Duration: time.Minute},
CgroupsPerQOS: utilpointer.Bool(false),
CgroupDriver: "cgroupfs",
CPUManagerPolicy: "none",
CPUManagerPolicyOptions: map[string]string{},
CPUManagerReconcilePeriod: metav1.Duration{Duration: 10 * time.Second},
MemoryManagerPolicy: v1beta1.NoneMemoryManagerPolicy,
TopologyManagerPolicy: v1beta1.NoneTopologyManagerPolicy,
TopologyManagerScope: v1beta1.ContainerTopologyManagerScope,
QOSReserved: map[string]string{},
RuntimeRequestTimeout: metav1.Duration{Duration: 2 * time.Minute},
HairpinMode: v1beta1.PromiscuousBridge,
MaxPods: 110,
PodPidsLimit: utilpointer.Int64(0),
ResolverConfig: utilpointer.String(""),
CPUCFSQuota: utilpointer.Bool(false),
CPUCFSQuotaPeriod: &zeroDuration,
NodeStatusMaxImages: utilpointer.Int32(0),
MaxOpenFiles: 1000000,
ContentType: "application/vnd.kubernetes.protobuf",
KubeAPIQPS: utilpointer.Int32(0),
KubeAPIBurst: 100,
SerializeImagePulls: utilpointer.Bool(false),
MaxParallelImagePulls: nil,
EvictionHard: map[string]string{},
EvictionSoft: map[string]string{},
EvictionSoftGracePeriod: map[string]string{},
EvictionPressureTransitionPeriod: metav1.Duration{Duration: 5 * time.Minute},
EvictionMinimumReclaim: map[string]string{},
EnableControllerAttachDetach: utilpointer.Bool(false),
MakeIPTablesUtilChains: utilpointer.Bool(false),
IPTablesMasqueradeBit: utilpointer.Int32(0),
IPTablesDropBit: utilpointer.Int32(0),
FeatureGates: map[string]bool{},
FailSwapOn: utilpointer.Bool(false),
MemorySwap: v1beta1.MemorySwapConfiguration{SwapBehavior: ""},
ContainerLogMaxSize: "10Mi",
ContainerLogMaxFiles: utilpointer.Int32(0),
RegistryPullQPS: utilpointer.Int32(0),
RegistryBurst: 10,
EventRecordQPS: utilpointer.Int32(0),
EventBurst: 100,
EnableDebuggingHandlers: utilpointer.Bool(false),
HealthzPort: utilpointer.Int32(0),
HealthzBindAddress: "127.0.0.1",
OOMScoreAdj: utilpointer.Int32(0),
ClusterDNS: []string{},
StreamingConnectionIdleTimeout: metav1.Duration{Duration: 4 * time.Hour},
NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Second},
NodeStatusReportFrequency: metav1.Duration{Duration: 5 * time.Minute},
NodeLeaseDurationSeconds: 40,
ContainerRuntimeEndpoint: "unix:///run/containerd/containerd.sock",
ImageMinimumGCAge: metav1.Duration{Duration: 2 * time.Minute},
ImageGCHighThresholdPercent: utilpointer.Int32(0),
ImageGCLowThresholdPercent: utilpointer.Int32(0),
VolumeStatsAggPeriod: metav1.Duration{Duration: time.Minute},
CgroupsPerQOS: utilpointer.Bool(false),
CgroupDriver: "cgroupfs",
CPUManagerPolicy: "none",
CPUManagerPolicyOptions: map[string]string{},
CPUManagerReconcilePeriod: metav1.Duration{Duration: 10 * time.Second},
MemoryManagerPolicy: v1beta1.NoneMemoryManagerPolicy,
TopologyManagerPolicy: v1beta1.NoneTopologyManagerPolicy,
TopologyManagerScope: v1beta1.ContainerTopologyManagerScope,
QOSReserved: map[string]string{},
RuntimeRequestTimeout: metav1.Duration{Duration: 2 * time.Minute},
HairpinMode: v1beta1.PromiscuousBridge,
MaxPods: 110,
PodPidsLimit: utilpointer.Int64(0),
ResolverConfig: utilpointer.String(""),
CPUCFSQuota: utilpointer.Bool(false),
CPUCFSQuotaPeriod: &zeroDuration,
NodeStatusMaxImages: utilpointer.Int32(0),
MaxOpenFiles: 1000000,
ContentType: "application/vnd.kubernetes.protobuf",
KubeAPIQPS: utilpointer.Int32(0),
KubeAPIBurst: 100,
SerializeImagePulls: utilpointer.Bool(false),
MaxParallelImagePulls: nil,
EvictionHard: map[string]string{},
EvictionSoft: map[string]string{},
EvictionSoftGracePeriod: map[string]string{},
EvictionPressureTransitionPeriod: metav1.Duration{Duration: 5 * time.Minute},
EvictionMinimumReclaim: map[string]string{},
EnableControllerAttachDetach: utilpointer.Bool(false),
MakeIPTablesUtilChains: utilpointer.Bool(false),
IPTablesMasqueradeBit: utilpointer.Int32(0),
IPTablesDropBit: utilpointer.Int32(0),
FeatureGates: map[string]bool{},
FailSwapOn: utilpointer.Bool(false),
MemorySwap: v1beta1.MemorySwapConfiguration{SwapBehavior: ""},
ContainerLogMaxSize: "10Mi",
ContainerLogMaxFiles: utilpointer.Int32(0),
ContainerLogMaxWorkers: utilpointer.Int32(1),
ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second},
ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
SystemReserved: map[string]string{},
KubeReserved: map[string]string{},
EnforceNodeAllocatable: []string{},
AllowedUnsafeSysctls: []string{},
VolumePluginDir: DefaultVolumePluginDir,
SystemReserved: map[string]string{},
KubeReserved: map[string]string{},
EnforceNodeAllocatable: []string{},
AllowedUnsafeSysctls: []string{},
VolumePluginDir: DefaultVolumePluginDir,
Logging: logsapi.LoggingConfiguration{
Format: "text",
FlushFrequency: logsapi.TimeOrMetaDuration{Duration: metav1.Duration{Duration: 5 * time.Second}, SerializeAsString: true},
@ -465,6 +471,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
MemorySwap: v1beta1.MemorySwapConfiguration{SwapBehavior: "UnlimitedSwap"},
ContainerLogMaxSize: "1Mi",
ContainerLogMaxFiles: utilpointer.Int32(1),
ContainerLogMaxWorkers: utilpointer.Int32(1),
ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second},
ConfigMapAndSecretChangeDetectionStrategy: v1beta1.TTLCacheChangeDetectionStrategy,
SystemReserved: map[string]string{
"memory": "1Gi",
@ -611,6 +619,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
MemorySwap: v1beta1.MemorySwapConfiguration{SwapBehavior: "UnlimitedSwap"},
ContainerLogMaxSize: "1Mi",
ContainerLogMaxFiles: utilpointer.Int32(1),
ContainerLogMaxWorkers: utilpointer.Int32(1),
ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second},
ConfigMapAndSecretChangeDetectionStrategy: v1beta1.TTLCacheChangeDetectionStrategy,
SystemReserved: map[string]string{
"memory": "1Gi",
@ -720,7 +730,9 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
IPTablesDropBit: utilpointer.Int32Ptr(DefaultIPTablesDropBit),
FailSwapOn: utilpointer.Bool(true),
ContainerLogMaxSize: "10Mi",
ContainerLogMaxFiles: utilpointer.Int32Ptr(5),
ContainerLogMaxFiles: utilpointer.Int32(5),
ContainerLogMaxWorkers: utilpointer.Int32(1),
ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second},
ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
EnforceNodeAllocatable: DefaultNodeAllocatableEnforcement,
VolumePluginDir: DefaultVolumePluginDir,
@ -809,7 +821,9 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
IPTablesDropBit: utilpointer.Int32Ptr(DefaultIPTablesDropBit),
FailSwapOn: utilpointer.Bool(true),
ContainerLogMaxSize: "10Mi",
ContainerLogMaxFiles: utilpointer.Int32Ptr(5),
ContainerLogMaxFiles: utilpointer.Int32(5),
ContainerLogMaxWorkers: utilpointer.Int32(1),
ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second},
ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
EnforceNodeAllocatable: DefaultNodeAllocatableEnforcement,
VolumePluginDir: DefaultVolumePluginDir,
@ -899,6 +913,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
FailSwapOn: utilpointer.Bool(true),
ContainerLogMaxSize: "10Mi",
ContainerLogMaxFiles: utilpointer.Int32(5),
ContainerLogMaxWorkers: utilpointer.Int32(1),
ContainerLogMonitorInterval: &metav1.Duration{Duration: 10 * time.Second},
ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
EnforceNodeAllocatable: DefaultNodeAllocatableEnforcement,
VolumePluginDir: DefaultVolumePluginDir,

View File

@ -476,6 +476,12 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in
if err := v1.Convert_Pointer_int32_To_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil {
return err
}
if err := v1.Convert_Pointer_int32_To_int32(&in.ContainerLogMaxWorkers, &out.ContainerLogMaxWorkers, s); err != nil {
return err
}
if err := v1.Convert_Pointer_v1_Duration_To_v1_Duration(&in.ContainerLogMonitorInterval, &out.ContainerLogMonitorInterval, s); err != nil {
return err
}
out.ConfigMapAndSecretChangeDetectionStrategy = config.ResourceChangeDetectionStrategy(in.ConfigMapAndSecretChangeDetectionStrategy)
out.SystemReserved = *(*map[string]string)(unsafe.Pointer(&in.SystemReserved))
out.KubeReserved = *(*map[string]string)(unsafe.Pointer(&in.KubeReserved))
@ -664,6 +670,12 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in
if err := v1.Convert_int32_To_Pointer_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil {
return err
}
if err := v1.Convert_int32_To_Pointer_int32(&in.ContainerLogMaxWorkers, &out.ContainerLogMaxWorkers, s); err != nil {
return err
}
if err := v1.Convert_v1_Duration_To_Pointer_v1_Duration(&in.ContainerLogMonitorInterval, &out.ContainerLogMonitorInterval, s); err != nil {
return err
}
out.ConfigMapAndSecretChangeDetectionStrategy = v1beta1.ResourceChangeDetectionStrategy(in.ConfigMapAndSecretChangeDetectionStrategy)
out.AllowedUnsafeSysctls = *(*[]string)(unsafe.Pointer(&in.AllowedUnsafeSysctls))
out.KernelMemcgNotification = in.KernelMemcgNotification

View File

@ -279,5 +279,12 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration, featur
fmt.Errorf("invalid configuration: enableSystemLogHandler is required for enableSystemLogQuery"))
}
if kc.ContainerLogMaxWorkers < 1 {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: containerLogMaxWorkers must be greater than or equal to 1"))
}
if kc.ContainerLogMonitorInterval.Duration.Seconds() < 3 {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: containerLogMonitorInterval must be a positive time duration greater than or equal to 3s"))
}
return utilerrors.NewAggregate(allErrors)
}

View File

@ -74,7 +74,9 @@ var (
Logging: logsapi.LoggingConfiguration{
Format: "text",
},
ContainerRuntimeEndpoint: "unix:///run/containerd/containerd.sock",
ContainerRuntimeEndpoint: "unix:///run/containerd/containerd.sock",
ContainerLogMaxWorkers: 1,
ContainerLogMonitorInterval: metav1.Duration{Duration: 10 * time.Second},
}
)
@ -545,6 +547,27 @@ func TestValidateKubeletConfiguration(t *testing.T) {
return conf
},
errMsg: "invalid configuration: imageMaximumGCAge 1ns must be greater than imageMinimumGCAge 2ns",
}, {
name: "containerLogMaxWorkers must be greater than or equal to 1",
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
conf.ContainerLogMaxWorkers = 0
return conf
},
errMsg: "invalid configuration: containerLogMaxWorkers must be greater than or equal to 1",
}, {
name: "containerLogMonitorInterval must be a positive time duration",
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
conf.ContainerLogMonitorInterval = metav1.Duration{Duration: -1 * time.Second}
return conf
},
errMsg: "invalid configuration: containerLogMonitorInterval must be a positive time duration greater than or equal to 3s",
}, {
name: "containerLogMonitorInterval must be at least 3s or higher",
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
conf.ContainerLogMonitorInterval = metav1.Duration{Duration: 2 * time.Second}
return conf
},
errMsg: "invalid configuration: containerLogMonitorInterval must be a positive time duration greater than or equal to 3s",
}}
for _, tc := range cases {

View File

@ -270,6 +270,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
}
}
out.MemorySwap = in.MemorySwap
out.ContainerLogMonitorInterval = in.ContainerLogMonitorInterval
if in.AllowedUnsafeSysctls != nil {
in, out := &in.AllowedUnsafeSysctls, &out.AllowedUnsafeSysctls
*out = make([]string, len(*in))

View File

@ -623,6 +623,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps.OSInterface,
kubeCfg.ContainerLogMaxSize,
int(kubeCfg.ContainerLogMaxFiles),
int(kubeCfg.ContainerLogMaxWorkers),
kubeCfg.ContainerLogMonitorInterval,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize container log manager: %v", err)

View File

@ -94,7 +94,7 @@ func (f *fakePodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID
func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring, tracer trace.Tracer) (*kubeGenericRuntimeManager, error) {
ctx := context.Background()
recorder := &record.FakeRecorder{}
logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2)
logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2, 10, metav1.Duration{Duration: 10 * time.Second})
if err != nil {
return nil, err
}

View File

@ -26,11 +26,12 @@ import (
"sort"
"strings"
"sync"
"time"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -39,9 +40,6 @@ import (
)
const (
// logMonitorPeriod is the period container log manager monitors
// container logs and performs log rotation.
logMonitorPeriod = 10 * time.Second
// timestampFormat is format of the timestamp suffix for rotated log.
// See https://golang.org/pkg/time/#Time.Format.
timestampFormat = "20060102-150405"
@ -143,15 +141,18 @@ func parseMaxSize(size string) (int64, error) {
}
type containerLogManager struct {
runtimeService internalapi.RuntimeService
osInterface kubecontainer.OSInterface
policy LogRotatePolicy
clock clock.Clock
mutex sync.Mutex
runtimeService internalapi.RuntimeService
osInterface kubecontainer.OSInterface
policy LogRotatePolicy
clock clock.Clock
mutex sync.Mutex
queue workqueue.RateLimitingInterface
maxWorkers int
monitoringPeriod metav1.Duration
}
// NewContainerLogManager creates a new container log manager.
func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterface kubecontainer.OSInterface, maxSize string, maxFiles int) (ContainerLogManager, error) {
func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterface kubecontainer.OSInterface, maxSize string, maxFiles int, maxWorkers int, monitorInterval metav1.Duration) (ContainerLogManager, error) {
if maxFiles <= 1 {
return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles)
}
@ -171,20 +172,28 @@ func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterfa
MaxSize: parsedMaxSize,
MaxFiles: maxFiles,
},
clock: clock.RealClock{},
mutex: sync.Mutex{},
clock: clock.RealClock{},
mutex: sync.Mutex{},
maxWorkers: maxWorkers,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"),
monitoringPeriod: monitorInterval,
}, nil
}
// Start the container log manager.
func (c *containerLogManager) Start() {
ctx := context.Background()
klog.InfoS("Initializing container log rotate workers", "workers", c.maxWorkers, "monitorPeriod", c.monitoringPeriod)
for i := 0; i < c.maxWorkers; i++ {
worker := i + 1
go c.processQueueItems(ctx, worker)
}
// Start a goroutine periodically does container log rotation.
go wait.Forever(func() {
if err := c.rotateLogs(ctx); err != nil {
klog.ErrorS(err, "Failed to rotate container logs")
}
}, logMonitorPeriod)
}, c.monitoringPeriod.Duration)
}
// Clean removes all logs of specified container (including rotated one).
@ -213,65 +222,91 @@ func (c *containerLogManager) Clean(ctx context.Context, containerID string) err
return nil
}
func (c *containerLogManager) processQueueItems(ctx context.Context, worker int) {
klog.V(4).InfoS("Starting container log rotation worker", "workerID", worker)
for c.processContainer(ctx, worker) {
}
klog.V(4).InfoS("Terminating container log rotation worker", "workerID", worker)
}
func (c *containerLogManager) rotateLogs(ctx context.Context) error {
c.mutex.Lock()
defer c.mutex.Unlock()
klog.V(4).InfoS("Starting container log rotation sequence")
// TODO(#59998): Use kubelet pod cache.
containers, err := c.runtimeService.ListContainers(ctx, &runtimeapi.ContainerFilter{})
if err != nil {
return fmt.Errorf("failed to list containers: %v", err)
}
// NOTE(random-liu): Figure out whether we need to rotate container logs in parallel.
for _, container := range containers {
// Only rotate logs for running containers. Non-running containers won't
// generate new output, it doesn't make sense to keep an empty latest log.
if container.GetState() != runtimeapi.ContainerState_CONTAINER_RUNNING {
continue
}
id := container.GetId()
// Note that we should not block log rotate for an error of a single container.
resp, err := c.runtimeService.ContainerStatus(ctx, id, false)
if err != nil {
klog.ErrorS(err, "Failed to get container status", "containerID", id)
continue
}
if resp.GetStatus() == nil {
klog.ErrorS(err, "Container status is nil", "containerID", id)
continue
}
path := resp.GetStatus().GetLogPath()
info, err := c.osInterface.Stat(path)
if err != nil {
if !os.IsNotExist(err) {
klog.ErrorS(err, "Failed to stat container log", "path", path)
continue
}
// In rotateLatestLog, there are several cases that we may
// lose original container log after ReopenContainerLog fails.
// We try to recover it by reopening container log.
if err := c.runtimeService.ReopenContainerLog(ctx, id); err != nil {
klog.ErrorS(err, "Container log doesn't exist, reopen container log failed", "containerID", id, "path", path)
continue
}
// The container log should be recovered.
info, err = c.osInterface.Stat(path)
if err != nil {
klog.ErrorS(err, "Failed to stat container log after reopen", "path", path)
continue
}
}
if info.Size() < c.policy.MaxSize {
continue
}
// Perform log rotation.
if err := c.rotateLog(ctx, id, path); err != nil {
klog.ErrorS(err, "Failed to rotate log for container", "path", path, "containerID", id)
continue
// Doing this to avoid additional overhead with logging of label like arguments that can prove costly
if v := klog.V(4); v.Enabled() {
klog.V(4).InfoS("Adding new entry to the queue for processing", "id", container.GetId(), "name", container.Metadata.GetName(), "labels", container.GetLabels())
}
c.queue.Add(container.GetId())
}
return nil
}
func (c *containerLogManager) processContainer(ctx context.Context, worker int) (ok bool) {
key, quit := c.queue.Get()
if quit {
return false
}
defer func() {
c.queue.Done(key)
c.queue.Forget(key)
}()
// Always default the return to true to keep the processing of Queue ongoing
ok = true
id := key.(string)
resp, err := c.runtimeService.ContainerStatus(ctx, id, false)
if err != nil {
klog.ErrorS(err, "Failed to get container status", "worker", worker, "containerID", id)
return
}
if resp.GetStatus() == nil {
klog.ErrorS(err, "Container status is nil", "worker", worker, "containerID", id)
return
}
path := resp.GetStatus().GetLogPath()
info, err := c.osInterface.Stat(path)
if err != nil {
if !os.IsNotExist(err) {
klog.ErrorS(err, "Failed to stat container log", "worker", worker, "containerID", id, "path", path)
return
}
if err = c.runtimeService.ReopenContainerLog(ctx, id); err != nil {
klog.ErrorS(err, "Container log doesn't exist, reopen container log failed", "worker", worker, "containerID", id, "path", path)
return
}
info, err = c.osInterface.Stat(path)
if err != nil {
klog.ErrorS(err, "Failed to stat container log after reopen", "worker", worker, "containerID", id, "path", path)
return
}
}
if info.Size() < c.policy.MaxSize {
klog.V(7).InfoS("log file doesn't need to be rotated", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize)
return
}
if err := c.rotateLog(ctx, id, path); err != nil {
klog.ErrorS(err, "Failed to rotate log for container", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize)
return
}
return
}
func (c *containerLogManager) rotateLog(ctx context.Context, id, log string) error {
// pattern is used to match all rotated files.
pattern := fmt.Sprintf("%s.*", log)

View File

@ -23,11 +23,15 @@ import (
"io"
"os"
"path/filepath"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/kubelet/container"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -92,8 +96,12 @@ func TestRotateLogs(t *testing.T) {
MaxSize: testMaxSize,
MaxFiles: testMaxFiles,
},
osInterface: container.RealOS{},
clock: testingclock.NewFakeClock(now),
osInterface: container.RealOS{},
clock: testingclock.NewFakeClock(now),
mutex: sync.Mutex{},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"),
maxWorkers: 10,
monitoringPeriod: v1.Duration{Duration: 10 * time.Second},
}
testLogs := []string{
"test-log-1",
@ -149,8 +157,16 @@ func TestRotateLogs(t *testing.T) {
},
}
f.SetFakeContainers(testContainers)
go c.processQueueItems(ctx, 1)
require.NoError(t, c.rotateLogs(ctx))
pollTimeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
err = wait.PollUntilContextCancel(pollTimeoutCtx, 20*time.Millisecond, false, func(ctx context.Context) (done bool, err error) {
return c.queue.Len() == 0, nil
})
require.NoError(t, err)
timestamp := now.Format(timestampFormat)
logs, err := os.ReadDir(dir)
require.NoError(t, err)
@ -160,6 +176,7 @@ func TestRotateLogs(t *testing.T) {
assert.Equal(t, testLogs[4]+compressSuffix, logs[2].Name())
assert.Equal(t, testLogs[2]+"."+timestamp, logs[3].Name())
assert.Equal(t, testLogs[3], logs[4].Name())
c.queue.ShutDown()
}
func TestClean(t *testing.T) {
@ -180,8 +197,12 @@ func TestClean(t *testing.T) {
MaxSize: testMaxSize,
MaxFiles: testMaxFiles,
},
osInterface: container.RealOS{},
clock: testingclock.NewFakeClock(now),
osInterface: container.RealOS{},
clock: testingclock.NewFakeClock(now),
mutex: sync.Mutex{},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"),
maxWorkers: 10,
monitoringPeriod: v1.Duration{Duration: 10 * time.Second},
}
testLogs := []string{
"test-log-1",
@ -383,10 +404,14 @@ func TestRotateLatestLog(t *testing.T) {
now := time.Now()
f := critest.NewFakeRuntimeService()
c := &containerLogManager{
runtimeService: f,
policy: LogRotatePolicy{MaxFiles: test.maxFiles},
osInterface: container.RealOS{},
clock: testingclock.NewFakeClock(now),
runtimeService: f,
policy: LogRotatePolicy{MaxFiles: test.maxFiles},
osInterface: container.RealOS{},
clock: testingclock.NewFakeClock(now),
mutex: sync.Mutex{},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"),
maxWorkers: 10,
monitoringPeriod: v1.Duration{Duration: 10 * time.Second},
}
if test.runtimeError != nil {
f.InjectError("ReopenContainerLog", test.runtimeError)

View File

@ -595,6 +595,21 @@ type KubeletConfiguration struct {
// Default: 5
// +optional
ContainerLogMaxFiles *int32 `json:"containerLogMaxFiles,omitempty"`
// ContainerLogMaxWorkers specifies the maximum number of concurrent workers to spawn
// for performing the log rotate operations. Set this count to 1 for disabling the
// concurrent log rotation workflows
// Default: 1
// +optional
ContainerLogMaxWorkers *int32 `json:"containerLogMaxWorkers,omitempty"`
// ContainerLogMonitorInterval specifies the duration at which the container logs are monitored
// for performing the log rotate operation. This defaults to 10 * time.Seconds. But can be
// customized to a smaller value based on the log generation rate and the size required to be
// rotated against
// Default: 10s
// +optional
ContainerLogMonitorInterval *metav1.Duration `json:"containerLogMonitorInterval,omitempty"`
// configMapAndSecretChangeDetectionStrategy is a mode in which ConfigMap and Secret
// managers are running. Valid values include:
//

View File

@ -384,6 +384,16 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
*out = new(int32)
**out = **in
}
if in.ContainerLogMaxWorkers != nil {
in, out := &in.ContainerLogMaxWorkers, &out.ContainerLogMaxWorkers
*out = new(int32)
**out = **in
}
if in.ContainerLogMonitorInterval != nil {
in, out := &in.ContainerLogMonitorInterval, &out.ContainerLogMonitorInterval
*out = new(v1.Duration)
**out = **in
}
if in.SystemReserved != nil {
in, out := &in.SystemReserved, &out.SystemReserved
*out = make(map[string]string, len(*in))

View File

@ -34,11 +34,13 @@ import (
)
const (
testContainerLogMaxFiles = 3
testContainerLogMaxSize = "40Ki"
rotationPollInterval = 5 * time.Second
rotationEventuallyTimeout = 3 * time.Minute
rotationConsistentlyTimeout = 2 * time.Minute
testContainerLogMaxFiles = 3
testContainerLogMaxSize = "40Ki"
testContainerLogMaxWorkers = 2
testContainerLogMonitorInterval = 2 * time.Second
rotationPollInterval = 5 * time.Second
rotationEventuallyTimeout = 3 * time.Minute
rotationConsistentlyTimeout = 2 * time.Minute
)
var _ = SIGDescribe("ContainerLogRotation", framework.WithSlow(), framework.WithSerial(), framework.WithDisruptive(), func() {
@ -106,3 +108,86 @@ var _ = SIGDescribe("ContainerLogRotation", framework.WithSlow(), framework.With
})
})
})
var _ = SIGDescribe("ContainerLogRotationWithMultipleWorkers", framework.WithSlow(), framework.WithSerial(), framework.WithDisruptive(), func() {
f := framework.NewDefaultFramework("container-log-rotation-test-multi-worker")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
ginkgo.Context("when a container generates a lot of logs", func() {
tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) {
initialConfig.ContainerLogMaxFiles = testContainerLogMaxFiles
initialConfig.ContainerLogMaxSize = testContainerLogMaxSize
initialConfig.ContainerLogMaxWorkers = testContainerLogMaxWorkers
initialConfig.ContainerLogMonitorInterval = metav1.Duration{Duration: testContainerLogMonitorInterval}
})
var logRotationPods []*v1.Pod
ginkgo.BeforeEach(func(ctx context.Context) {
ginkgo.By("create log container 1")
for _, name := range []string{"test-container-log-rotation", "test-container-log-rotation-1"} {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Name: "log-container",
Image: busyboxImage,
Command: []string{
"sh",
"-c",
// ~12Kb/s. Exceeding 40Kb in 4 seconds. Log rotation period is 10 seconds.
"while true; do echo hello world; sleep 0.001; done;",
},
},
},
},
}
logRotationPod := e2epod.NewPodClient(f).CreateSync(ctx, pod)
logRotationPods = append(logRotationPods, logRotationPod)
ginkgo.DeferCleanup(e2epod.NewPodClient(f).DeleteSync, logRotationPod.Name, metav1.DeleteOptions{}, time.Minute)
}
})
ginkgo.It("should be rotated and limited to a fixed amount of files", func(ctx context.Context) {
ginkgo.By("get container log path")
var logPaths []string
for _, pod := range logRotationPods {
gomega.Expect(pod.Status.ContainerStatuses).To(gomega.HaveLen(1), "log rotation pod should have one container")
id := kubecontainer.ParseContainerID(pod.Status.ContainerStatuses[0].ContainerID).ID
r, _, err := getCRIClient()
framework.ExpectNoError(err, "should connect to CRI and obtain runtime service clients and image service client")
resp, err := r.ContainerStatus(context.Background(), id, false)
framework.ExpectNoError(err)
logPaths = append(logPaths, resp.GetStatus().GetLogPath())
}
ginkgo.By("wait for container log being rotated to max file limit")
gomega.Eventually(ctx, func() (int, error) {
var logFiles []string
for _, logPath := range logPaths {
logs, err := kubelogs.GetAllLogs(logPath)
if err != nil {
return 0, err
}
logFiles = append(logFiles, logs...)
}
return len(logFiles), nil
}, rotationEventuallyTimeout, rotationPollInterval).Should(gomega.Equal(testContainerLogMaxFiles*2), "should eventually rotate to max file limit")
ginkgo.By("make sure container log number won't exceed max file limit")
gomega.Consistently(ctx, func() (int, error) {
var logFiles []string
for _, logPath := range logPaths {
logs, err := kubelogs.GetAllLogs(logPath)
if err != nil {
return 0, err
}
logFiles = append(logFiles, logs...)
}
return len(logFiles), nil
}, rotationConsistentlyTimeout, rotationPollInterval).Should(gomega.BeNumerically("<=", testContainerLogMaxFiles*2), "should never exceed max file limit")
})
})
})