diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index 38a47eb535d..916f10d19e5 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -527,6 +527,8 @@ func AddKubeletConfigFlags(mainfs *pflag.FlagSet, c *kubeletconfig.KubeletConfig fs.BoolVar(&c.MakeIPTablesUtilChains, "make-iptables-util-chains", c.MakeIPTablesUtilChains, "If true, kubelet will ensure iptables utility rules are present on host.") fs.Int32Var(&c.IPTablesMasqueradeBit, "iptables-masquerade-bit", c.IPTablesMasqueradeBit, "The bit of the fwmark space to mark packets for SNAT. Must be within the range [0, 31]. Please match this parameter with corresponding parameter in kube-proxy.") fs.Int32Var(&c.IPTablesDropBit, "iptables-drop-bit", c.IPTablesDropBit, "The bit of the fwmark space to mark packets for dropping. Must be within the range [0, 31].") + fs.StringVar(&c.ContainerLogMaxSize, "container-log-max-size", c.ContainerLogMaxSize, " Set the maximum size (e.g. 10Mi) of container log file before it is rotated.") + fs.Int32Var(&c.ContainerLogMaxFiles, "container-log-max-files", c.ContainerLogMaxFiles, " Set the maximum number of container log files that can be present for a container. The number must be >= 2.") // Flags intended for testing, not recommended used in production environments. fs.Int64Var(&c.MaxOpenFiles, "max-open-files", c.MaxOpenFiles, "Number of files that can be opened by Kubelet process.") diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 2c7f06b919a..d621d2e963c 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -250,6 +250,12 @@ const ( // // Implement TokenRequest endpoint on service account resources. TokenRequest utilfeature.Feature = "TokenRequest" + + // owner: @Random-Liu + // alpha: v1.10 + // + // Enable container log rotation for cri container runtime + CRIContainerLogRotation utilfeature.Feature = "CRIContainerLogRotation" ) func init() { @@ -293,6 +299,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS HyperVContainer: {Default: false, PreRelease: utilfeature.Alpha}, NoDaemonSetScheduler: {Default: false, PreRelease: utilfeature.Alpha}, TokenRequest: {Default: false, PreRelease: utilfeature.Alpha}, + CRIContainerLogRotation: {Default: false, PreRelease: utilfeature.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 79ed5b08181..51244136aaa 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -61,6 +61,7 @@ go_library( "//pkg/kubelet/kubeletconfig:go_default_library", "//pkg/kubelet/kuberuntime:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", + "//pkg/kubelet/logs:go_default_library", "//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/metrics/collectors:go_default_library", "//pkg/kubelet/mountpod:go_default_library", @@ -181,6 +182,7 @@ go_test( "//pkg/kubelet/gpu:go_default_library", "//pkg/kubelet/images:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", + "//pkg/kubelet/logs:go_default_library", "//pkg/kubelet/network:go_default_library", "//pkg/kubelet/network/testing:go_default_library", "//pkg/kubelet/pleg:go_default_library", @@ -268,6 +270,7 @@ filegroup( "//pkg/kubelet/kuberuntime:all-srcs", "//pkg/kubelet/leaky:all-srcs", "//pkg/kubelet/lifecycle:all-srcs", + "//pkg/kubelet/logs:all-srcs", "//pkg/kubelet/metrics:all-srcs", "//pkg/kubelet/mountpod:all-srcs", "//pkg/kubelet/network:all-srcs", diff --git a/pkg/kubelet/apis/cri/runtime/v1alpha2/api.pb.go b/pkg/kubelet/apis/cri/runtime/v1alpha2/api.pb.go index 4c930f77888..653c72603c4 100644 --- a/pkg/kubelet/apis/cri/runtime/v1alpha2/api.pb.go +++ b/pkg/kubelet/apis/cri/runtime/v1alpha2/api.pb.go @@ -3739,7 +3739,9 @@ type RuntimeServiceClient interface { UpdateContainerResources(ctx context.Context, in *UpdateContainerResourcesRequest, opts ...grpc.CallOption) (*UpdateContainerResourcesResponse, error) // ReopenContainerLog asks runtime to reopen the stdout/stderr log file // for the container. This is often called after the log file has been - // rotated. + // rotated. If the container is not running, container runtime can choose + // to either create a new log file and return nil, or return an error. + // Once it returns error, new container log file MUST NOT be created. ReopenContainerLog(ctx context.Context, in *ReopenContainerLogRequest, opts ...grpc.CallOption) (*ReopenContainerLogResponse, error) // ExecSync runs a command in a container synchronously. ExecSync(ctx context.Context, in *ExecSyncRequest, opts ...grpc.CallOption) (*ExecSyncResponse, error) @@ -4017,7 +4019,9 @@ type RuntimeServiceServer interface { UpdateContainerResources(context.Context, *UpdateContainerResourcesRequest) (*UpdateContainerResourcesResponse, error) // ReopenContainerLog asks runtime to reopen the stdout/stderr log file // for the container. This is often called after the log file has been - // rotated. + // rotated. If the container is not running, container runtime can choose + // to either create a new log file and return nil, or return an error. + // Once it returns error, new container log file MUST NOT be created. ReopenContainerLog(context.Context, *ReopenContainerLogRequest) (*ReopenContainerLogResponse, error) // ExecSync runs a command in a container synchronously. ExecSync(context.Context, *ExecSyncRequest) (*ExecSyncResponse, error) diff --git a/pkg/kubelet/apis/cri/runtime/v1alpha2/api.proto b/pkg/kubelet/apis/cri/runtime/v1alpha2/api.proto index d6f1aba5641..257cfbc2e9f 100644 --- a/pkg/kubelet/apis/cri/runtime/v1alpha2/api.proto +++ b/pkg/kubelet/apis/cri/runtime/v1alpha2/api.proto @@ -66,7 +66,9 @@ service RuntimeService { rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {} // ReopenContainerLog asks runtime to reopen the stdout/stderr log file // for the container. This is often called after the log file has been - // rotated. + // rotated. If the container is not running, container runtime can choose + // to either create a new log file and return nil, or return an error. + // Once it returns error, new container log file MUST NOT be created. rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {} // ExecSync runs a command in a container synchronously. diff --git a/pkg/kubelet/apis/cri/services.go b/pkg/kubelet/apis/cri/services.go index 6657f6f9e58..d8387dc2fc2 100644 --- a/pkg/kubelet/apis/cri/services.go +++ b/pkg/kubelet/apis/cri/services.go @@ -53,7 +53,8 @@ type ContainerManager interface { // Attach prepares a streaming endpoint to attach to a running container, and returns the address. Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) // ReopenContainerLog asks runtime to reopen the stdout/stderr log file - // for the container. + // for the container. If it returns error, new container log file MUST NOT + // be created. ReopenContainerLog(ContainerID string) error } diff --git a/pkg/kubelet/apis/cri/testing/fake_runtime_service.go b/pkg/kubelet/apis/cri/testing/fake_runtime_service.go index 02799c09fa4..03be06be306 100644 --- a/pkg/kubelet/apis/cri/testing/fake_runtime_service.go +++ b/pkg/kubelet/apis/cri/testing/fake_runtime_service.go @@ -49,6 +49,7 @@ type FakeRuntimeService struct { sync.Mutex Called []string + Errors map[string][]error FakeStatus *runtimeapi.RuntimeStatus Containers map[string]*FakeContainer @@ -101,9 +102,29 @@ func (r *FakeRuntimeService) AssertCalls(calls []string) error { return nil } +func (r *FakeRuntimeService) InjectError(f string, err error) { + r.Lock() + defer r.Unlock() + r.Errors[f] = append(r.Errors[f], err) +} + +// caller of popError must grab a lock. +func (r *FakeRuntimeService) popError(f string) error { + if r.Errors == nil { + return nil + } + errs := r.Errors[f] + if len(errs) == 0 { + return nil + } + err, errs := errs[0], errs[1:] + return err +} + func NewFakeRuntimeService() *FakeRuntimeService { return &FakeRuntimeService{ Called: make([]string, 0), + Errors: make(map[string][]error), Containers: make(map[string]*FakeContainer), Sandboxes: make(map[string]*FakePodSandbox), FakeContainerStats: make(map[string]*runtimeapi.ContainerStats), @@ -465,5 +486,10 @@ func (r *FakeRuntimeService) ReopenContainerLog(containerID string) error { defer r.Unlock() r.Called = append(r.Called, "ReopenContainerLog") + + if err := r.popError("ReopenContainerLog"); err != nil { + return err + } + return nil } diff --git a/pkg/kubelet/apis/kubeletconfig/fuzzer/fuzzer.go b/pkg/kubelet/apis/kubeletconfig/fuzzer/fuzzer.go index 9727632827e..506a354d329 100644 --- a/pkg/kubelet/apis/kubeletconfig/fuzzer/fuzzer.go +++ b/pkg/kubelet/apis/kubeletconfig/fuzzer/fuzzer.go @@ -90,6 +90,8 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} { obj.CgroupDriver = "cgroupfs" obj.EnforceNodeAllocatable = v1beta1.DefaultNodeAllocatableEnforcement obj.ManifestURLHeader = make(map[string][]string) + obj.ContainerLogMaxFiles = 5 + obj.ContainerLogMaxSize = "10Mi" }, } } diff --git a/pkg/kubelet/apis/kubeletconfig/helpers_test.go b/pkg/kubelet/apis/kubeletconfig/helpers_test.go index 82cd5bb7151..83e93915e1e 100644 --- a/pkg/kubelet/apis/kubeletconfig/helpers_test.go +++ b/pkg/kubelet/apis/kubeletconfig/helpers_test.go @@ -152,6 +152,8 @@ var ( "CgroupsPerQOS", "ClusterDNS[*]", "ClusterDomain", + "ContainerLogMaxFiles", + "ContainerLogMaxSize", "ContentType", "EnableContentionProfiling", "EnableControllerAttachDetach", diff --git a/pkg/kubelet/apis/kubeletconfig/types.go b/pkg/kubelet/apis/kubeletconfig/types.go index 419a4a5b032..fb2355fb228 100644 --- a/pkg/kubelet/apis/kubeletconfig/types.go +++ b/pkg/kubelet/apis/kubeletconfig/types.go @@ -240,6 +240,10 @@ type KubeletConfiguration struct { FeatureGates map[string]bool // Tells the Kubelet to fail to start if swap is enabled on the node. FailSwapOn bool + // A quantity defines the maximum size of the container log file before it is rotated. For example: "5Mi" or "256Ki". + ContainerLogMaxSize string + // Maximum number of container log files that can be present for a container. + ContainerLogMaxFiles int32 /* following flags are meant for Node Allocatable */ diff --git a/pkg/kubelet/apis/kubeletconfig/v1beta1/defaults.go b/pkg/kubelet/apis/kubeletconfig/v1beta1/defaults.go index 3c444fc76ee..b8d6bb32f73 100644 --- a/pkg/kubelet/apis/kubeletconfig/v1beta1/defaults.go +++ b/pkg/kubelet/apis/kubeletconfig/v1beta1/defaults.go @@ -192,6 +192,12 @@ func SetDefaults_KubeletConfiguration(obj *KubeletConfiguration) { if obj.FailSwapOn == nil { obj.FailSwapOn = utilpointer.BoolPtr(true) } + if obj.ContainerLogMaxSize == "" { + obj.ContainerLogMaxSize = "10Mi" + } + if obj.ContainerLogMaxFiles == nil { + obj.ContainerLogMaxFiles = utilpointer.Int32Ptr(5) + } if obj.EnforceNodeAllocatable == nil { obj.EnforceNodeAllocatable = DefaultNodeAllocatableEnforcement } diff --git a/pkg/kubelet/apis/kubeletconfig/v1beta1/types.go b/pkg/kubelet/apis/kubeletconfig/v1beta1/types.go index 4036baed3f2..955fb3f925a 100644 --- a/pkg/kubelet/apis/kubeletconfig/v1beta1/types.go +++ b/pkg/kubelet/apis/kubeletconfig/v1beta1/types.go @@ -390,6 +390,14 @@ type KubeletConfiguration struct { // Default: true // +optional FailSwapOn *bool `json:"failSwapOn,omitempty"` + // A quantity defines the maximum size of the container log file before it is rotated. For example: "5Mi" or "256Ki". + // Default: "10Mi" + // +optional + ContainerLogMaxSize string `json:"containerLogMaxSize,omitempty"` + // Maximum number of container log files that can be present for a container. + // Default: 5 + // +optional + ContainerLogMaxFiles *int32 `json:"containerLogMaxFiles,omitempty"` /* following flags are meant for Node Allocatable */ diff --git a/pkg/kubelet/apis/kubeletconfig/v1beta1/zz_generated.conversion.go b/pkg/kubelet/apis/kubeletconfig/v1beta1/zz_generated.conversion.go index 9aa24cd2128..85a0bfd7a12 100644 --- a/pkg/kubelet/apis/kubeletconfig/v1beta1/zz_generated.conversion.go +++ b/pkg/kubelet/apis/kubeletconfig/v1beta1/zz_generated.conversion.go @@ -244,6 +244,10 @@ func autoConvert_v1beta1_KubeletConfiguration_To_kubeletconfig_KubeletConfigurat if err := v1.Convert_Pointer_bool_To_bool(&in.FailSwapOn, &out.FailSwapOn, s); err != nil { return err } + out.ContainerLogMaxSize = in.ContainerLogMaxSize + if err := v1.Convert_Pointer_int32_To_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil { + return err + } out.SystemReserved = *(*map[string]string)(unsafe.Pointer(&in.SystemReserved)) out.KubeReserved = *(*map[string]string)(unsafe.Pointer(&in.KubeReserved)) out.SystemReservedCgroup = in.SystemReservedCgroup @@ -361,6 +365,10 @@ func autoConvert_kubeletconfig_KubeletConfiguration_To_v1beta1_KubeletConfigurat if err := v1.Convert_bool_To_Pointer_bool(&in.FailSwapOn, &out.FailSwapOn, s); err != nil { return err } + out.ContainerLogMaxSize = in.ContainerLogMaxSize + if err := v1.Convert_int32_To_Pointer_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil { + return err + } out.SystemReserved = *(*map[string]string)(unsafe.Pointer(&in.SystemReserved)) out.KubeReserved = *(*map[string]string)(unsafe.Pointer(&in.KubeReserved)) out.SystemReservedCgroup = in.SystemReservedCgroup diff --git a/pkg/kubelet/apis/kubeletconfig/v1beta1/zz_generated.deepcopy.go b/pkg/kubelet/apis/kubeletconfig/v1beta1/zz_generated.deepcopy.go index 14811c68d60..d0f8d5f748b 100644 --- a/pkg/kubelet/apis/kubeletconfig/v1beta1/zz_generated.deepcopy.go +++ b/pkg/kubelet/apis/kubeletconfig/v1beta1/zz_generated.deepcopy.go @@ -311,6 +311,15 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) { **out = **in } } + if in.ContainerLogMaxFiles != nil { + in, out := &in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles + if *in == nil { + *out = nil + } else { + *out = new(int32) + **out = **in + } + } if in.SystemReserved != nil { in, out := &in.SystemReserved, &out.SystemReserved *out = make(map[string]string, len(*in)) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 667eeae8538..8d997e13f51 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -74,6 +74,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/kubeletconfig" "k8s.io/kubernetes/pkg/kubelet/kuberuntime" "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/logs" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics/collectors" "k8s.io/kubernetes/pkg/kubelet/network" @@ -758,6 +759,21 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, } klet.imageManager = imageManager + if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) { + // setup containerLogManager for CRI container runtime + containerLogManager, err := logs.NewContainerLogManager( + klet.runtimeService, + kubeCfg.ContainerLogMaxSize, + int(kubeCfg.ContainerLogMaxFiles), + ) + if err != nil { + return nil, fmt.Errorf("failed to initialize container log manager: %v", err) + } + klet.containerLogManager = containerLogManager + } else { + klet.containerLogManager = logs.NewStubContainerLogManager() + } + klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet) if utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) && kubeDeps.TLSOptions != nil { @@ -993,6 +1009,9 @@ type Kubelet struct { // Manager for image garbage collection. imageManager images.ImageGCManager + // Manager for container logs. + containerLogManager logs.ContainerLogManager + // Secret manager. secretManager secret.Manager @@ -1335,6 +1354,9 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { // Fail kubelet and rely on the babysitter to retry starting kubelet. glog.Fatalf("Failed to start ContainerManager %v", err) } + // container log manager must start after container runtime is up to retrieve information from container runtime + // and inform container to reopen log file after log rotation. + kl.containerLogManager.Start() } // Run starts the kubelet reacting to config updates diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 18746c67d6a..ad59b34d399 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -52,6 +52,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/gpu" "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/logs" "k8s.io/kubernetes/pkg/kubelet/network" nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" "k8s.io/kubernetes/pkg/kubelet/pleg" @@ -262,6 +263,7 @@ func newTestKubeletWithImageList( fakeImageService: fakeRuntime, ImageGCManager: imageGCManager, } + kubelet.containerLogManager = logs.NewStubContainerLogManager() fakeClock := clock.NewFakeClock(time.Now()) kubelet.backOff = flowcontrol.NewBackOff(time.Second, time.Minute) kubelet.backOff.Clock = fakeClock diff --git a/pkg/kubelet/logs/BUILD b/pkg/kubelet/logs/BUILD new file mode 100644 index 00000000000..bd16b1c23d9 --- /dev/null +++ b/pkg/kubelet/logs/BUILD @@ -0,0 +1,46 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "container_log_manager.go", + "container_log_manager_stub.go", + ], + importpath = "k8s.io/kubernetes/pkg/kubelet/logs", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kubelet/apis/cri:go_default_library", + "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["container_log_manager_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", + "//pkg/kubelet/apis/cri/testing:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/github.com/stretchr/testify/require:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/logs/container_log_manager.go b/pkg/kubelet/logs/container_log_manager.go new file mode 100644 index 00000000000..baedd6c4c4c --- /dev/null +++ b/pkg/kubelet/logs/container_log_manager.go @@ -0,0 +1,387 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logs + +import ( + "compress/gzip" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strings" + "time" + + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/wait" + internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" + runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" +) + +const ( + // logMonitorPeriod is the period container log manager monitors + // container logs and performs log rotation. + logMonitorPeriod = 10 * time.Second + // timestampFormat is format of the timestamp suffix for rotated log. + // See https://golang.org/pkg/time/#Time.Format. + timestampFormat = "20060102-150405" + // compressSuffix is the suffix for compressed log. + compressSuffix = ".gz" + // tmpSuffix is the suffix for temporary file. + tmpSuffix = ".tmp" +) + +// ContainerLogManager manages lifecycle of all container logs. +// +// Implementation is thread-safe. +type ContainerLogManager interface { + // TODO(random-liu): Add RotateLogs function and call it under disk pressure. + // Start container log manager. + Start() +} + +// LogRotatePolicy is a policy for container log rotation. The policy applies to all +// containers managed by kubelet. +type LogRotatePolicy struct { + // MaxSize in bytes of the container log file before it is rotated. Negative + // number means to disable container log rotation. + MaxSize int64 + // MaxFiles is the maximum number of log files that can be present. + // If rotating the logs creates excess files, the oldest file is removed. + MaxFiles int +} + +// GetAllLogs gets all inuse (rotated/compressed) logs for a specific container log. +// Returned logs are sorted in oldest to newest order. +// TODO(#59902): Leverage this function to support log rotation in `kubectl logs`. +func GetAllLogs(log string) ([]string, error) { + // pattern is used to match all rotated files. + pattern := fmt.Sprintf("%s.*", log) + logs, err := filepath.Glob(pattern) + if err != nil { + return nil, fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err) + } + inuse, _ := filterUnusedLogs(logs) + sort.Strings(inuse) + return append(inuse, log), nil +} + +// compressReadCloser wraps gzip.Reader with a function to close file handler. +type compressReadCloser struct { + f *os.File + *gzip.Reader +} + +func (rc *compressReadCloser) Close() error { + ferr := rc.f.Close() + rerr := rc.Reader.Close() + if ferr != nil { + return ferr + } + if rerr != nil { + return rerr + } + return nil +} + +// UncompressLog compresses a compressed log and return a readcloser for the +// stream of the uncompressed content. +// TODO(#59902): Leverage this function to support log rotation in `kubectl logs`. +func UncompressLog(log string) (_ io.ReadCloser, retErr error) { + if !strings.HasSuffix(log, compressSuffix) { + return nil, fmt.Errorf("log is not compressed") + } + f, err := os.Open(log) + if err != nil { + return nil, fmt.Errorf("failed to open log: %v", err) + } + defer func() { + if retErr != nil { + f.Close() + } + }() + r, err := gzip.NewReader(f) + if err != nil { + return nil, fmt.Errorf("failed to create gzip reader: %v", err) + } + return &compressReadCloser{f: f, Reader: r}, nil +} + +// parseMaxSize parses quantity string to int64 max size in bytes. +func parseMaxSize(size string) (int64, error) { + quantity, err := resource.ParseQuantity(size) + if err != nil { + return 0, err + } + maxSize, ok := quantity.AsInt64() + if !ok { + return 0, fmt.Errorf("invalid max log size") + } + if maxSize < 0 { + return 0, fmt.Errorf("negative max log size %d", maxSize) + } + return maxSize, nil +} + +type containerLogManager struct { + runtimeService internalapi.RuntimeService + policy LogRotatePolicy + clock clock.Clock +} + +// NewContainerLogManager creates a new container log manager. +func NewContainerLogManager(runtimeService internalapi.RuntimeService, maxSize string, maxFiles int) (ContainerLogManager, error) { + if maxFiles <= 1 { + return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles) + } + parsedMaxSize, err := parseMaxSize(maxSize) + if err != nil { + return nil, fmt.Errorf("failed to parse container log max size %q: %v", maxSize, err) + } + // policy LogRotatePolicy + return &containerLogManager{ + runtimeService: runtimeService, + policy: LogRotatePolicy{ + MaxSize: parsedMaxSize, + MaxFiles: maxFiles, + }, + clock: clock.RealClock{}, + }, nil +} + +// Start the container log manager. +func (c *containerLogManager) Start() { + // Start a goroutine peirodically does container log rotation. + go wait.Forever(func() { + if err := c.rotateLogs(); err != nil { + glog.Errorf("Failed to rotate container logs: %v", err) + } + }, logMonitorPeriod) +} + +func (c *containerLogManager) rotateLogs() error { + // TODO(#59998): Use kubelet pod cache. + containers, err := c.runtimeService.ListContainers(&runtimeapi.ContainerFilter{}) + if err != nil { + return fmt.Errorf("failed to list containers: %v", err) + } + // NOTE(random-liu): Figure out whether we need to rotate container logs in parallel. + for _, container := range containers { + // Only rotate logs for running containers. Non-running containers won't + // generate new output, it doesn't make sense to keep an empty latest log. + if container.GetState() != runtimeapi.ContainerState_CONTAINER_RUNNING { + continue + } + id := container.GetId() + // Note that we should not block log rotate for an error of a single container. + status, err := c.runtimeService.ContainerStatus(id) + if err != nil { + glog.Errorf("Failed to get container status for %q: %v", id, err) + continue + } + path := status.GetLogPath() + info, err := os.Stat(path) + if err != nil { + if !os.IsNotExist(err) { + glog.Errorf("Failed to stat container log %q: %v", path, err) + continue + } + // In rotateLatestLog, there are several cases that we may + // lose original container log after ReopenContainerLog fails. + // We try to to recover it by reopening container log. + if err := c.runtimeService.ReopenContainerLog(id); err != nil { + glog.Errorf("Container %q log %q doesn't exist, reopen container log failed: %v", id, path, err) + continue + } + // The container log should be recovered. + info, err = os.Stat(path) + if err != nil { + glog.Errorf("Failed to stat container log %q after reopen: %v", path, err) + continue + } + } + if info.Size() < c.policy.MaxSize { + continue + } + // Perform log rotation. + if err := c.rotateLog(id, path); err != nil { + glog.Errorf("Failed to rotate log %q for container %q: %v", path, id, err) + continue + } + } + return nil +} + +func (c *containerLogManager) rotateLog(id, log string) error { + // pattern is used to match all rotated files. + pattern := fmt.Sprintf("%s.*", log) + logs, err := filepath.Glob(pattern) + if err != nil { + return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err) + } + + logs, err = c.cleanupUnusedLogs(logs) + if err != nil { + return fmt.Errorf("failed to cleanup logs: %v", err) + } + + logs, err = c.removeExcessLogs(logs) + if err != nil { + return fmt.Errorf("failed to remove excess logs: %v", err) + } + + // Compress uncompressed log files. + for _, l := range logs { + if strings.HasSuffix(l, compressSuffix) { + continue + } + if err := c.compressLog(l); err != nil { + return fmt.Errorf("failed to compress log %q: %v", l, err) + } + } + + if err := c.rotateLatestLog(id, log); err != nil { + return fmt.Errorf("failed to rotate log %q: %v", log, err) + } + + return nil +} + +// cleanupUnusedLogs cleans up temporary or unused log files generated by previous log rotation +// failure. +func (c *containerLogManager) cleanupUnusedLogs(logs []string) ([]string, error) { + inuse, unused := filterUnusedLogs(logs) + for _, l := range unused { + if err := os.Remove(l); err != nil { + return nil, fmt.Errorf("failed to remove unused log %q: %v", l, err) + } + } + return inuse, nil +} + +// filterUnusedLogs splits logs into 2 groups, the 1st group is in used logs, +// the second group is unused logs. +func filterUnusedLogs(logs []string) (inuse []string, unused []string) { + for _, l := range logs { + if isInUse(l, logs) { + inuse = append(inuse, l) + } else { + unused = append(unused, l) + } + } + return inuse, unused +} + +// isInUse checks whether a container log file is still inuse. +func isInUse(l string, logs []string) bool { + // All temporary files are not in use. + if strings.HasSuffix(l, tmpSuffix) { + return false + } + // All compresed logs are in use. + if strings.HasSuffix(l, compressSuffix) { + return true + } + // Files has already been compressed are not in use. + for _, another := range logs { + if l+compressSuffix == another { + return false + } + } + return true +} + +// removeExcessLogs removes old logs to make sure there are only at most MaxFiles log files. +func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error) { + // Sort log files in oldest to newest order. + sort.Strings(logs) + // Container will create a new log file, and we'll rotate the latest log file. + // Other than those 2 files, we can have at most MaxFiles-2 rotated log files. + // Keep MaxFiles-2 files by removing old files. + // We should remove from oldest to newest, so as not to break ongoing `kubectl logs`. + maxRotatedFiles := c.policy.MaxFiles - 2 + if maxRotatedFiles < 0 { + maxRotatedFiles = 0 + } + i := 0 + for ; i < len(logs)-maxRotatedFiles; i++ { + if err := os.Remove(logs[i]); err != nil { + return nil, fmt.Errorf("failed to remove old log %q: %v", logs[i], err) + } + } + logs = logs[i:] + return logs, nil +} + +// compressLog compresses a log to log.gz with gzip. +func (c *containerLogManager) compressLog(log string) error { + r, err := os.Open(log) + if err != nil { + return fmt.Errorf("failed to open log %q: %v", log, err) + } + defer r.Close() + tmpLog := log + tmpSuffix + f, err := os.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return fmt.Errorf("failed to create temporary log %q: %v", tmpLog, err) + } + defer func() { + // Best effort cleanup of tmpLog. + os.Remove(tmpLog) + }() + defer f.Close() + w := gzip.NewWriter(f) + defer w.Close() + if _, err := io.Copy(w, r); err != nil { + return fmt.Errorf("failed to compress %q to %q: %v", log, tmpLog, err) + } + compressedLog := log + compressSuffix + if err := os.Rename(tmpLog, compressedLog); err != nil { + return fmt.Errorf("failed to rename %q to %q: %v", tmpLog, compressedLog, err) + } + // Remove old log file. + if err := os.Remove(log); err != nil { + return fmt.Errorf("failed to remove log %q after compress: %v", log, err) + } + return nil +} + +// rotateLatestLog rotates latest log without compression, so that container can still write +// and fluentd can finish reading. +func (c *containerLogManager) rotateLatestLog(id, log string) error { + timestamp := c.clock.Now().Format(timestampFormat) + rotated := fmt.Sprintf("%s.%s", log, timestamp) + if err := os.Rename(log, rotated); err != nil { + return fmt.Errorf("failed to rotate log %q to %q: %v", log, rotated, err) + } + if err := c.runtimeService.ReopenContainerLog(id); err != nil { + // Rename the rotated log back, so that we can try rotating it again + // next round. + // If kubelet gets restarted at this point, we'll lose original log. + if renameErr := os.Rename(rotated, log); renameErr != nil { + // This shouldn't happen. + // Report an error if this happens, because we will lose original + // log. + glog.Errorf("Failed to rename rotated log %q back to %q: %v, reopen container log error: %v", rotated, log, renameErr, err) + } + return fmt.Errorf("failed to reopen container log %q: %v", id, err) + } + return nil +} diff --git a/pkg/kubelet/logs/container_log_manager_stub.go b/pkg/kubelet/logs/container_log_manager_stub.go new file mode 100644 index 00000000000..a6e0729f19d --- /dev/null +++ b/pkg/kubelet/logs/container_log_manager_stub.go @@ -0,0 +1,26 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logs + +type containerLogManagerStub struct{} + +func (*containerLogManagerStub) Start() {} + +// NewStubContainerLogManager returns an empty ContainerLogManager which does nothing. +func NewStubContainerLogManager() ContainerLogManager { + return &containerLogManagerStub{} +} diff --git a/pkg/kubelet/logs/container_log_manager_test.go b/pkg/kubelet/logs/container_log_manager_test.go new file mode 100644 index 00000000000..9080394ac86 --- /dev/null +++ b/pkg/kubelet/logs/container_log_manager_test.go @@ -0,0 +1,324 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logs + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "k8s.io/apimachinery/pkg/util/clock" + runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" + critest "k8s.io/kubernetes/pkg/kubelet/apis/cri/testing" +) + +func TestGetAllLogs(t *testing.T) { + dir, err := ioutil.TempDir("", "test-get-all-logs") + require.NoError(t, err) + defer os.RemoveAll(dir) + testLogs := []string{ + "test-log.11111111-111111.gz", + "test-log", + "test-log.00000000-000000.gz", + "test-log.19900322-000000.gz", + "test-log.19900322-111111.gz", + "test-log.19880620-000000", // unused log + "test-log.19880620-000000.gz", + "test-log.19880620-111111.gz", + "test-log.20180101-000000", + "test-log.20180101-000000.tmp", // temporary log + } + expectLogs := []string{ + "test-log.00000000-000000.gz", + "test-log.11111111-111111.gz", + "test-log.19880620-000000.gz", + "test-log.19880620-111111.gz", + "test-log.19900322-000000.gz", + "test-log.19900322-111111.gz", + "test-log.20180101-000000", + "test-log", + } + for i := range testLogs { + f, err := os.Create(filepath.Join(dir, testLogs[i])) + require.NoError(t, err) + f.Close() + } + got, err := GetAllLogs(filepath.Join(dir, "test-log")) + assert.NoError(t, err) + for i := range expectLogs { + expectLogs[i] = filepath.Join(dir, expectLogs[i]) + } + assert.Equal(t, expectLogs, got) +} + +func TestRotateLogs(t *testing.T) { + dir, err := ioutil.TempDir("", "test-rotate-logs") + require.NoError(t, err) + defer os.RemoveAll(dir) + + const ( + testMaxFiles = 3 + testMaxSize = 10 + ) + now := time.Now() + f := critest.NewFakeRuntimeService() + c := &containerLogManager{ + runtimeService: f, + policy: LogRotatePolicy{ + MaxSize: testMaxSize, + MaxFiles: testMaxFiles, + }, + clock: clock.NewFakeClock(now), + } + testLogs := []string{ + "test-log-1", + "test-log-2", + "test-log-3", + "test-log-4", + "test-log-3.00000000-000001", + "test-log-3.00000000-000000.gz", + } + testContent := []string{ + "short", + "longer than 10 bytes", + "longer than 10 bytes", + "longer than 10 bytes", + "the length doesn't matter", + "the length doesn't matter", + } + for i := range testLogs { + f, err := os.Create(filepath.Join(dir, testLogs[i])) + require.NoError(t, err) + _, err = f.Write([]byte(testContent[i])) + require.NoError(t, err) + f.Close() + } + testContainers := []*critest.FakeContainer{ + { + ContainerStatus: runtimeapi.ContainerStatus{ + Id: "container-not-need-rotate", + State: runtimeapi.ContainerState_CONTAINER_RUNNING, + LogPath: filepath.Join(dir, testLogs[0]), + }, + }, + { + ContainerStatus: runtimeapi.ContainerStatus{ + Id: "container-need-rotate", + State: runtimeapi.ContainerState_CONTAINER_RUNNING, + LogPath: filepath.Join(dir, testLogs[1]), + }, + }, + { + ContainerStatus: runtimeapi.ContainerStatus{ + Id: "container-has-excess-log", + State: runtimeapi.ContainerState_CONTAINER_RUNNING, + LogPath: filepath.Join(dir, testLogs[2]), + }, + }, + { + ContainerStatus: runtimeapi.ContainerStatus{ + Id: "container-is-not-running", + State: runtimeapi.ContainerState_CONTAINER_EXITED, + LogPath: filepath.Join(dir, testLogs[3]), + }, + }, + } + f.SetFakeContainers(testContainers) + require.NoError(t, c.rotateLogs()) + + timestamp := now.Format(timestampFormat) + logs, err := ioutil.ReadDir(dir) + require.NoError(t, err) + assert.Len(t, logs, 5) + assert.Equal(t, testLogs[0], logs[0].Name()) + assert.Equal(t, testLogs[1]+"."+timestamp, logs[1].Name()) + assert.Equal(t, testLogs[4]+compressSuffix, logs[2].Name()) + assert.Equal(t, testLogs[2]+"."+timestamp, logs[3].Name()) + assert.Equal(t, testLogs[3], logs[4].Name()) +} + +func TestCleanupUnusedLog(t *testing.T) { + dir, err := ioutil.TempDir("", "test-cleanup-unused-log") + require.NoError(t, err) + defer os.RemoveAll(dir) + + testLogs := []string{ + "test-log-1", // regular log + "test-log-1.tmp", // temporary log + "test-log-2", // unused log + "test-log-2.gz", // compressed log + } + + for i := range testLogs { + testLogs[i] = filepath.Join(dir, testLogs[i]) + f, err := os.Create(testLogs[i]) + require.NoError(t, err) + f.Close() + } + + c := &containerLogManager{} + got, err := c.cleanupUnusedLogs(testLogs) + require.NoError(t, err) + assert.Len(t, got, 2) + assert.Equal(t, []string{testLogs[0], testLogs[3]}, got) + + logs, err := ioutil.ReadDir(dir) + require.NoError(t, err) + assert.Len(t, logs, 2) + assert.Equal(t, testLogs[0], filepath.Join(dir, logs[0].Name())) + assert.Equal(t, testLogs[3], filepath.Join(dir, logs[1].Name())) +} + +func TestRemoveExcessLog(t *testing.T) { + for desc, test := range map[string]struct { + max int + expect []string + }{ + "MaxFiles equal to 2": { + max: 2, + expect: []string{}, + }, + "MaxFiles more than 2": { + max: 3, + expect: []string{"test-log-4"}, + }, + "MaxFiles more than log file number": { + max: 6, + expect: []string{"test-log-1", "test-log-2", "test-log-3", "test-log-4"}, + }, + } { + t.Logf("TestCase %q", desc) + dir, err := ioutil.TempDir("", "test-remove-excess-log") + require.NoError(t, err) + defer os.RemoveAll(dir) + + testLogs := []string{"test-log-3", "test-log-1", "test-log-2", "test-log-4"} + + for i := range testLogs { + testLogs[i] = filepath.Join(dir, testLogs[i]) + f, err := os.Create(testLogs[i]) + require.NoError(t, err) + f.Close() + } + + c := &containerLogManager{policy: LogRotatePolicy{MaxFiles: test.max}} + got, err := c.removeExcessLogs(testLogs) + require.NoError(t, err) + require.Len(t, got, len(test.expect)) + for i, name := range test.expect { + assert.Equal(t, name, filepath.Base(got[i])) + } + + logs, err := ioutil.ReadDir(dir) + require.NoError(t, err) + require.Len(t, logs, len(test.expect)) + for i, name := range test.expect { + assert.Equal(t, name, logs[i].Name()) + } + } +} + +func TestCompressLog(t *testing.T) { + dir, err := ioutil.TempDir("", "test-compress-log") + require.NoError(t, err) + defer os.RemoveAll(dir) + + testFile, err := ioutil.TempFile(dir, "test-rotate-latest-log") + require.NoError(t, err) + defer testFile.Close() + testContent := "test log content" + _, err = testFile.Write([]byte(testContent)) + require.NoError(t, err) + + testLog := testFile.Name() + c := &containerLogManager{} + require.NoError(t, c.compressLog(testLog)) + _, err = os.Stat(testLog + compressSuffix) + assert.NoError(t, err, "log should be compressed") + _, err = os.Stat(testLog + tmpSuffix) + assert.Error(t, err, "temporary log should be renamed") + _, err = os.Stat(testLog) + assert.Error(t, err, "original log should be removed") + + rc, err := UncompressLog(testLog + compressSuffix) + require.NoError(t, err) + defer rc.Close() + var buf bytes.Buffer + _, err = io.Copy(&buf, rc) + require.NoError(t, err) + assert.Equal(t, testContent, buf.String()) +} + +func TestRotateLatestLog(t *testing.T) { + dir, err := ioutil.TempDir("", "test-rotate-latest-log") + require.NoError(t, err) + defer os.RemoveAll(dir) + + for desc, test := range map[string]struct { + runtimeError error + maxFiles int + expectError bool + expectOriginal bool + expectRotated bool + }{ + "should successfully rotate log when MaxFiles is 2": { + maxFiles: 2, + expectError: false, + expectOriginal: false, + expectRotated: true, + }, + "should restore original log when ReopenContainerLog fails": { + runtimeError: fmt.Errorf("random error"), + maxFiles: 2, + expectError: true, + expectOriginal: true, + expectRotated: false, + }, + } { + t.Logf("TestCase %q", desc) + now := time.Now() + f := critest.NewFakeRuntimeService() + c := &containerLogManager{ + runtimeService: f, + policy: LogRotatePolicy{MaxFiles: test.maxFiles}, + clock: clock.NewFakeClock(now), + } + if test.runtimeError != nil { + f.InjectError("ReopenContainerLog", test.runtimeError) + } + testFile, err := ioutil.TempFile(dir, "test-rotate-latest-log") + require.NoError(t, err) + defer testFile.Close() + testLog := testFile.Name() + rotatedLog := fmt.Sprintf("%s.%s", testLog, now.Format(timestampFormat)) + err = c.rotateLatestLog("test-id", testLog) + assert.Equal(t, test.expectError, err != nil) + _, err = os.Stat(testLog) + assert.Equal(t, test.expectOriginal, err == nil) + _, err = os.Stat(rotatedLog) + assert.Equal(t, test.expectRotated, err == nil) + assert.NoError(t, f.AssertCalls([]string{"ReopenContainerLog"})) + } +} diff --git a/test/e2e_node/BUILD b/test/e2e_node/BUILD index 60680043973..69768dfebb9 100644 --- a/test/e2e_node/BUILD +++ b/test/e2e_node/BUILD @@ -84,6 +84,7 @@ go_test( name = "go_default_test", srcs = [ "apparmor_test.go", + "container_log_rotation_test.go", "cpu_manager_test.go", "critical_pod_test.go", "docker_test.go", @@ -132,6 +133,7 @@ go_test( "//pkg/kubelet/images:go_default_library", "//pkg/kubelet/kubeletconfig:go_default_library", "//pkg/kubelet/kubeletconfig/status:go_default_library", + "//pkg/kubelet/logs:go_default_library", "//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/security/apparmor:go_default_library", diff --git a/test/e2e_node/container_log_rotation_test.go b/test/e2e_node/container_log_rotation_test.go new file mode 100644 index 00000000000..61f99451a7c --- /dev/null +++ b/test/e2e_node/container_log_rotation_test.go @@ -0,0 +1,107 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e_node + +import ( + "time" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + kubelogs "k8s.io/kubernetes/pkg/kubelet/logs" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +const ( + testContainerLogMaxFiles = 3 + testContainerLogMaxSize = "40Ki" + rotationPollInterval = 5 * time.Second + rotationEventuallyTimeout = 3 * time.Minute + rotationConsistentlyTimeout = 2 * time.Minute +) + +var _ = framework.KubeDescribe("ContainerLogRotation [Slow] [Serial] [Disruptive]", func() { + f := framework.NewDefaultFramework("container-log-rotation-test") + Context("when a container generates a lot of log", func() { + BeforeEach(func() { + if framework.TestContext.ContainerRuntime != kubetypes.RemoteContainerRuntime { + framework.Skipf("Skipping ContainerLogRotation test since the container runtime is not remote") + } + }) + + tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) { + initialConfig.FeatureGates[string(features.CRIContainerLogRotation)] = true + initialConfig.ContainerLogMaxFiles = testContainerLogMaxFiles + initialConfig.ContainerLogMaxSize = testContainerLogMaxSize + }) + + It("should be rotated and limited to a fixed amount of files", func() { + By("create log container") + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-container-log-rotation", + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Name: "log-container", + Image: busyboxImage, + Command: []string{ + "sh", + "-c", + // ~12Kb/s. Exceeding 40Kb in 4 seconds. Log rotation period is 10 seconds. + "while true; do echo hello world; sleep 0.001; done;", + }, + }, + }, + }, + } + pod = f.PodClient().CreateSync(pod) + By("get container log path") + Expect(len(pod.Status.ContainerStatuses)).To(Equal(1)) + id := kubecontainer.ParseContainerID(pod.Status.ContainerStatuses[0].ContainerID).ID + r, _, err := getCRIClient() + Expect(err).NotTo(HaveOccurred()) + status, err := r.ContainerStatus(id) + Expect(err).NotTo(HaveOccurred()) + logPath := status.GetLogPath() + By("wait for container log being rotated to max file limit") + Eventually(func() (int, error) { + logs, err := kubelogs.GetAllLogs(logPath) + if err != nil { + return 0, err + } + return len(logs), nil + }, rotationEventuallyTimeout, rotationPollInterval).Should(Equal(testContainerLogMaxFiles), "should eventually rotate to max file limit") + By("make sure container log number won't exceed max file limit") + Consistently(func() (int, error) { + logs, err := kubelogs.GetAllLogs(logPath) + if err != nil { + return 0, err + } + return len(logs), nil + }, rotationConsistentlyTimeout, rotationPollInterval).Should(BeNumerically("<=", testContainerLogMaxFiles), "should never exceed max file limit") + }) + }) +})