Merge pull request #59898 from Random-Liu/add-log-rotation

Automatic merge from submit-queue (batch tested with PRs 60214, 58762, 59898, 59897, 60204). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add CRI container log rotation support

Fixes https://github.com/kubernetes/kubernetes/issues/58823.

This PR:
1) Added `pkg/kubelet/logs/container_log_manager.go` which manages and rotates container logs.
2) Added a feature gate `CRIContainerLogRotation` to enable the alpha feature. And 2 kubelet flags `--container-log-max-size` and `--container-log-max-files` to configure the rotation behavior.
3) Added unit test and node e2e test for container log rotation.

Note that:
1) Container log manager only starts when the container runtime is `remote` (not docker), because we can't implement `ReopenContainerLog` for docker.
2) Rotated logs are compressed with `gzip`.
2) The latest rotated log is not compressed. Because fluentd may still be reading the file right after rotation.
3) `kubectl logs` still doesn't support log rotation. This is not a regression anyway, it doesn't support log rotation for docker log today. We'll probably fix this in the future. (Issue: https://github.com/kubernetes/kubernetes/issues/59902)

An example of container log directory with `--container-log-max-files=3`:
```console
$ ls -al /var/log/pods/57146449-11ec-11e8-90e1-42010af00002
total 592
drwxr-xr-x 2 root root   4096 Feb 15 01:07 .
drwxr-xr-x 3 root root  12288 Feb 15 01:06 ..
-rw-r----- 1 root root 176870 Feb 15 01:07 log-container_0.log
-rw-r--r-- 1 root root  40239 Feb 15 01:07 log-container_0.log.20180215-010737.gz
-rw-r----- 1 root root 365996 Feb 15 01:07 log-container_0.log.20180215-010747
```

/assign @mtaufen for the config change.
/assign @dashpole @crassirostris for the log change.
/assign @feiskyer for CRI related change.
/cc @yujuhong @feiskyer @abhi @mikebrow @mrunalp @runcom 
/cc @kubernetes/sig-node-pr-reviews @kubernetes/sig-instrumentation-pr-reviews 

**Release note**:

```release-note
[Alpha] Kubelet now supports container log rotation for container runtime which implements CRI(container runtime interface).
The feature can be enabled with feature gate `CRIContainerLogRotation`.
The flags `--container-log-max-size` and `--container-log-max-files` can be used to configure the rotation behavior.
```
This commit is contained in:
Kubernetes Submit Queue 2018-02-22 22:02:37 -08:00 committed by GitHub
commit b38f1b901f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1004 additions and 4 deletions

View File

@ -527,6 +527,8 @@ func AddKubeletConfigFlags(mainfs *pflag.FlagSet, c *kubeletconfig.KubeletConfig
fs.BoolVar(&c.MakeIPTablesUtilChains, "make-iptables-util-chains", c.MakeIPTablesUtilChains, "If true, kubelet will ensure iptables utility rules are present on host.")
fs.Int32Var(&c.IPTablesMasqueradeBit, "iptables-masquerade-bit", c.IPTablesMasqueradeBit, "The bit of the fwmark space to mark packets for SNAT. Must be within the range [0, 31]. Please match this parameter with corresponding parameter in kube-proxy.")
fs.Int32Var(&c.IPTablesDropBit, "iptables-drop-bit", c.IPTablesDropBit, "The bit of the fwmark space to mark packets for dropping. Must be within the range [0, 31].")
fs.StringVar(&c.ContainerLogMaxSize, "container-log-max-size", c.ContainerLogMaxSize, "<Warning: Alpha feature> Set the maximum size (e.g. 10Mi) of container log file before it is rotated.")
fs.Int32Var(&c.ContainerLogMaxFiles, "container-log-max-files", c.ContainerLogMaxFiles, "<Warning: Alpha feature> Set the maximum number of container log files that can be present for a container. The number must be >= 2.")
// Flags intended for testing, not recommended used in production environments.
fs.Int64Var(&c.MaxOpenFiles, "max-open-files", c.MaxOpenFiles, "Number of files that can be opened by Kubelet process.")

View File

@ -250,6 +250,12 @@ const (
//
// Implement TokenRequest endpoint on service account resources.
TokenRequest utilfeature.Feature = "TokenRequest"
// owner: @Random-Liu
// alpha: v1.10
//
// Enable container log rotation for cri container runtime
CRIContainerLogRotation utilfeature.Feature = "CRIContainerLogRotation"
)
func init() {
@ -293,6 +299,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
HyperVContainer: {Default: false, PreRelease: utilfeature.Alpha},
NoDaemonSetScheduler: {Default: false, PreRelease: utilfeature.Alpha},
TokenRequest: {Default: false, PreRelease: utilfeature.Alpha},
CRIContainerLogRotation: {Default: false, PreRelease: utilfeature.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:

View File

@ -61,6 +61,7 @@ go_library(
"//pkg/kubelet/kubeletconfig:go_default_library",
"//pkg/kubelet/kuberuntime:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/logs:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/metrics/collectors:go_default_library",
"//pkg/kubelet/mountpod:go_default_library",
@ -181,6 +182,7 @@ go_test(
"//pkg/kubelet/gpu:go_default_library",
"//pkg/kubelet/images:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/logs:go_default_library",
"//pkg/kubelet/network:go_default_library",
"//pkg/kubelet/network/testing:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
@ -268,6 +270,7 @@ filegroup(
"//pkg/kubelet/kuberuntime:all-srcs",
"//pkg/kubelet/leaky:all-srcs",
"//pkg/kubelet/lifecycle:all-srcs",
"//pkg/kubelet/logs:all-srcs",
"//pkg/kubelet/metrics:all-srcs",
"//pkg/kubelet/mountpod:all-srcs",
"//pkg/kubelet/network:all-srcs",

View File

@ -3739,7 +3739,9 @@ type RuntimeServiceClient interface {
UpdateContainerResources(ctx context.Context, in *UpdateContainerResourcesRequest, opts ...grpc.CallOption) (*UpdateContainerResourcesResponse, error)
// ReopenContainerLog asks runtime to reopen the stdout/stderr log file
// for the container. This is often called after the log file has been
// rotated.
// rotated. If the container is not running, container runtime can choose
// to either create a new log file and return nil, or return an error.
// Once it returns error, new container log file MUST NOT be created.
ReopenContainerLog(ctx context.Context, in *ReopenContainerLogRequest, opts ...grpc.CallOption) (*ReopenContainerLogResponse, error)
// ExecSync runs a command in a container synchronously.
ExecSync(ctx context.Context, in *ExecSyncRequest, opts ...grpc.CallOption) (*ExecSyncResponse, error)
@ -4017,7 +4019,9 @@ type RuntimeServiceServer interface {
UpdateContainerResources(context.Context, *UpdateContainerResourcesRequest) (*UpdateContainerResourcesResponse, error)
// ReopenContainerLog asks runtime to reopen the stdout/stderr log file
// for the container. This is often called after the log file has been
// rotated.
// rotated. If the container is not running, container runtime can choose
// to either create a new log file and return nil, or return an error.
// Once it returns error, new container log file MUST NOT be created.
ReopenContainerLog(context.Context, *ReopenContainerLogRequest) (*ReopenContainerLogResponse, error)
// ExecSync runs a command in a container synchronously.
ExecSync(context.Context, *ExecSyncRequest) (*ExecSyncResponse, error)

View File

@ -66,7 +66,9 @@ service RuntimeService {
rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {}
// ReopenContainerLog asks runtime to reopen the stdout/stderr log file
// for the container. This is often called after the log file has been
// rotated.
// rotated. If the container is not running, container runtime can choose
// to either create a new log file and return nil, or return an error.
// Once it returns error, new container log file MUST NOT be created.
rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {}
// ExecSync runs a command in a container synchronously.

View File

@ -53,7 +53,8 @@ type ContainerManager interface {
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error)
// ReopenContainerLog asks runtime to reopen the stdout/stderr log file
// for the container.
// for the container. If it returns error, new container log file MUST NOT
// be created.
ReopenContainerLog(ContainerID string) error
}

View File

@ -49,6 +49,7 @@ type FakeRuntimeService struct {
sync.Mutex
Called []string
Errors map[string][]error
FakeStatus *runtimeapi.RuntimeStatus
Containers map[string]*FakeContainer
@ -101,9 +102,29 @@ func (r *FakeRuntimeService) AssertCalls(calls []string) error {
return nil
}
func (r *FakeRuntimeService) InjectError(f string, err error) {
r.Lock()
defer r.Unlock()
r.Errors[f] = append(r.Errors[f], err)
}
// caller of popError must grab a lock.
func (r *FakeRuntimeService) popError(f string) error {
if r.Errors == nil {
return nil
}
errs := r.Errors[f]
if len(errs) == 0 {
return nil
}
err, errs := errs[0], errs[1:]
return err
}
func NewFakeRuntimeService() *FakeRuntimeService {
return &FakeRuntimeService{
Called: make([]string, 0),
Errors: make(map[string][]error),
Containers: make(map[string]*FakeContainer),
Sandboxes: make(map[string]*FakePodSandbox),
FakeContainerStats: make(map[string]*runtimeapi.ContainerStats),
@ -465,5 +486,10 @@ func (r *FakeRuntimeService) ReopenContainerLog(containerID string) error {
defer r.Unlock()
r.Called = append(r.Called, "ReopenContainerLog")
if err := r.popError("ReopenContainerLog"); err != nil {
return err
}
return nil
}

View File

@ -90,6 +90,8 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} {
obj.CgroupDriver = "cgroupfs"
obj.EnforceNodeAllocatable = v1beta1.DefaultNodeAllocatableEnforcement
obj.ManifestURLHeader = make(map[string][]string)
obj.ContainerLogMaxFiles = 5
obj.ContainerLogMaxSize = "10Mi"
},
}
}

View File

@ -152,6 +152,8 @@ var (
"CgroupsPerQOS",
"ClusterDNS[*]",
"ClusterDomain",
"ContainerLogMaxFiles",
"ContainerLogMaxSize",
"ContentType",
"EnableContentionProfiling",
"EnableControllerAttachDetach",

View File

@ -240,6 +240,10 @@ type KubeletConfiguration struct {
FeatureGates map[string]bool
// Tells the Kubelet to fail to start if swap is enabled on the node.
FailSwapOn bool
// A quantity defines the maximum size of the container log file before it is rotated. For example: "5Mi" or "256Ki".
ContainerLogMaxSize string
// Maximum number of container log files that can be present for a container.
ContainerLogMaxFiles int32
/* following flags are meant for Node Allocatable */

View File

@ -192,6 +192,12 @@ func SetDefaults_KubeletConfiguration(obj *KubeletConfiguration) {
if obj.FailSwapOn == nil {
obj.FailSwapOn = utilpointer.BoolPtr(true)
}
if obj.ContainerLogMaxSize == "" {
obj.ContainerLogMaxSize = "10Mi"
}
if obj.ContainerLogMaxFiles == nil {
obj.ContainerLogMaxFiles = utilpointer.Int32Ptr(5)
}
if obj.EnforceNodeAllocatable == nil {
obj.EnforceNodeAllocatable = DefaultNodeAllocatableEnforcement
}

View File

@ -390,6 +390,14 @@ type KubeletConfiguration struct {
// Default: true
// +optional
FailSwapOn *bool `json:"failSwapOn,omitempty"`
// A quantity defines the maximum size of the container log file before it is rotated. For example: "5Mi" or "256Ki".
// Default: "10Mi"
// +optional
ContainerLogMaxSize string `json:"containerLogMaxSize,omitempty"`
// Maximum number of container log files that can be present for a container.
// Default: 5
// +optional
ContainerLogMaxFiles *int32 `json:"containerLogMaxFiles,omitempty"`
/* following flags are meant for Node Allocatable */

View File

@ -244,6 +244,10 @@ func autoConvert_v1beta1_KubeletConfiguration_To_kubeletconfig_KubeletConfigurat
if err := v1.Convert_Pointer_bool_To_bool(&in.FailSwapOn, &out.FailSwapOn, s); err != nil {
return err
}
out.ContainerLogMaxSize = in.ContainerLogMaxSize
if err := v1.Convert_Pointer_int32_To_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil {
return err
}
out.SystemReserved = *(*map[string]string)(unsafe.Pointer(&in.SystemReserved))
out.KubeReserved = *(*map[string]string)(unsafe.Pointer(&in.KubeReserved))
out.SystemReservedCgroup = in.SystemReservedCgroup
@ -361,6 +365,10 @@ func autoConvert_kubeletconfig_KubeletConfiguration_To_v1beta1_KubeletConfigurat
if err := v1.Convert_bool_To_Pointer_bool(&in.FailSwapOn, &out.FailSwapOn, s); err != nil {
return err
}
out.ContainerLogMaxSize = in.ContainerLogMaxSize
if err := v1.Convert_int32_To_Pointer_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil {
return err
}
out.SystemReserved = *(*map[string]string)(unsafe.Pointer(&in.SystemReserved))
out.KubeReserved = *(*map[string]string)(unsafe.Pointer(&in.KubeReserved))
out.SystemReservedCgroup = in.SystemReservedCgroup

View File

@ -311,6 +311,15 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
**out = **in
}
}
if in.ContainerLogMaxFiles != nil {
in, out := &in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles
if *in == nil {
*out = nil
} else {
*out = new(int32)
**out = **in
}
}
if in.SystemReserved != nil {
in, out := &in.SystemReserved, &out.SystemReserved
*out = make(map[string]string, len(*in))

View File

@ -74,6 +74,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/kubernetes/pkg/kubelet/network"
@ -758,6 +759,21 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
klet.imageManager = imageManager
if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) {
// setup containerLogManager for CRI container runtime
containerLogManager, err := logs.NewContainerLogManager(
klet.runtimeService,
kubeCfg.ContainerLogMaxSize,
int(kubeCfg.ContainerLogMaxFiles),
)
if err != nil {
return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
}
klet.containerLogManager = containerLogManager
} else {
klet.containerLogManager = logs.NewStubContainerLogManager()
}
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
if utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) && kubeDeps.TLSOptions != nil {
@ -993,6 +1009,9 @@ type Kubelet struct {
// Manager for image garbage collection.
imageManager images.ImageGCManager
// Manager for container logs.
containerLogManager logs.ContainerLogManager
// Secret manager.
secretManager secret.Manager
@ -1335,6 +1354,9 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
glog.Fatalf("Failed to start ContainerManager %v", err)
}
// container log manager must start after container runtime is up to retrieve information from container runtime
// and inform container to reopen log file after log rotation.
kl.containerLogManager.Start()
}
// Run starts the kubelet reacting to config updates

View File

@ -52,6 +52,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/gpu"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs"
"k8s.io/kubernetes/pkg/kubelet/network"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
"k8s.io/kubernetes/pkg/kubelet/pleg"
@ -262,6 +263,7 @@ func newTestKubeletWithImageList(
fakeImageService: fakeRuntime,
ImageGCManager: imageGCManager,
}
kubelet.containerLogManager = logs.NewStubContainerLogManager()
fakeClock := clock.NewFakeClock(time.Now())
kubelet.backOff = flowcontrol.NewBackOff(time.Second, time.Minute)
kubelet.backOff.Clock = fakeClock

46
pkg/kubelet/logs/BUILD Normal file
View File

@ -0,0 +1,46 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"container_log_manager.go",
"container_log_manager_stub.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/logs",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["container_log_manager_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/apis/cri/testing:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,387 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logs
import (
"compress/gzip"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)
const (
// logMonitorPeriod is the period container log manager monitors
// container logs and performs log rotation.
logMonitorPeriod = 10 * time.Second
// timestampFormat is format of the timestamp suffix for rotated log.
// See https://golang.org/pkg/time/#Time.Format.
timestampFormat = "20060102-150405"
// compressSuffix is the suffix for compressed log.
compressSuffix = ".gz"
// tmpSuffix is the suffix for temporary file.
tmpSuffix = ".tmp"
)
// ContainerLogManager manages lifecycle of all container logs.
//
// Implementation is thread-safe.
type ContainerLogManager interface {
// TODO(random-liu): Add RotateLogs function and call it under disk pressure.
// Start container log manager.
Start()
}
// LogRotatePolicy is a policy for container log rotation. The policy applies to all
// containers managed by kubelet.
type LogRotatePolicy struct {
// MaxSize in bytes of the container log file before it is rotated. Negative
// number means to disable container log rotation.
MaxSize int64
// MaxFiles is the maximum number of log files that can be present.
// If rotating the logs creates excess files, the oldest file is removed.
MaxFiles int
}
// GetAllLogs gets all inuse (rotated/compressed) logs for a specific container log.
// Returned logs are sorted in oldest to newest order.
// TODO(#59902): Leverage this function to support log rotation in `kubectl logs`.
func GetAllLogs(log string) ([]string, error) {
// pattern is used to match all rotated files.
pattern := fmt.Sprintf("%s.*", log)
logs, err := filepath.Glob(pattern)
if err != nil {
return nil, fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
}
inuse, _ := filterUnusedLogs(logs)
sort.Strings(inuse)
return append(inuse, log), nil
}
// compressReadCloser wraps gzip.Reader with a function to close file handler.
type compressReadCloser struct {
f *os.File
*gzip.Reader
}
func (rc *compressReadCloser) Close() error {
ferr := rc.f.Close()
rerr := rc.Reader.Close()
if ferr != nil {
return ferr
}
if rerr != nil {
return rerr
}
return nil
}
// UncompressLog compresses a compressed log and return a readcloser for the
// stream of the uncompressed content.
// TODO(#59902): Leverage this function to support log rotation in `kubectl logs`.
func UncompressLog(log string) (_ io.ReadCloser, retErr error) {
if !strings.HasSuffix(log, compressSuffix) {
return nil, fmt.Errorf("log is not compressed")
}
f, err := os.Open(log)
if err != nil {
return nil, fmt.Errorf("failed to open log: %v", err)
}
defer func() {
if retErr != nil {
f.Close()
}
}()
r, err := gzip.NewReader(f)
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %v", err)
}
return &compressReadCloser{f: f, Reader: r}, nil
}
// parseMaxSize parses quantity string to int64 max size in bytes.
func parseMaxSize(size string) (int64, error) {
quantity, err := resource.ParseQuantity(size)
if err != nil {
return 0, err
}
maxSize, ok := quantity.AsInt64()
if !ok {
return 0, fmt.Errorf("invalid max log size")
}
if maxSize < 0 {
return 0, fmt.Errorf("negative max log size %d", maxSize)
}
return maxSize, nil
}
type containerLogManager struct {
runtimeService internalapi.RuntimeService
policy LogRotatePolicy
clock clock.Clock
}
// NewContainerLogManager creates a new container log manager.
func NewContainerLogManager(runtimeService internalapi.RuntimeService, maxSize string, maxFiles int) (ContainerLogManager, error) {
if maxFiles <= 1 {
return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles)
}
parsedMaxSize, err := parseMaxSize(maxSize)
if err != nil {
return nil, fmt.Errorf("failed to parse container log max size %q: %v", maxSize, err)
}
// policy LogRotatePolicy
return &containerLogManager{
runtimeService: runtimeService,
policy: LogRotatePolicy{
MaxSize: parsedMaxSize,
MaxFiles: maxFiles,
},
clock: clock.RealClock{},
}, nil
}
// Start the container log manager.
func (c *containerLogManager) Start() {
// Start a goroutine peirodically does container log rotation.
go wait.Forever(func() {
if err := c.rotateLogs(); err != nil {
glog.Errorf("Failed to rotate container logs: %v", err)
}
}, logMonitorPeriod)
}
func (c *containerLogManager) rotateLogs() error {
// TODO(#59998): Use kubelet pod cache.
containers, err := c.runtimeService.ListContainers(&runtimeapi.ContainerFilter{})
if err != nil {
return fmt.Errorf("failed to list containers: %v", err)
}
// NOTE(random-liu): Figure out whether we need to rotate container logs in parallel.
for _, container := range containers {
// Only rotate logs for running containers. Non-running containers won't
// generate new output, it doesn't make sense to keep an empty latest log.
if container.GetState() != runtimeapi.ContainerState_CONTAINER_RUNNING {
continue
}
id := container.GetId()
// Note that we should not block log rotate for an error of a single container.
status, err := c.runtimeService.ContainerStatus(id)
if err != nil {
glog.Errorf("Failed to get container status for %q: %v", id, err)
continue
}
path := status.GetLogPath()
info, err := os.Stat(path)
if err != nil {
if !os.IsNotExist(err) {
glog.Errorf("Failed to stat container log %q: %v", path, err)
continue
}
// In rotateLatestLog, there are several cases that we may
// lose original container log after ReopenContainerLog fails.
// We try to to recover it by reopening container log.
if err := c.runtimeService.ReopenContainerLog(id); err != nil {
glog.Errorf("Container %q log %q doesn't exist, reopen container log failed: %v", id, path, err)
continue
}
// The container log should be recovered.
info, err = os.Stat(path)
if err != nil {
glog.Errorf("Failed to stat container log %q after reopen: %v", path, err)
continue
}
}
if info.Size() < c.policy.MaxSize {
continue
}
// Perform log rotation.
if err := c.rotateLog(id, path); err != nil {
glog.Errorf("Failed to rotate log %q for container %q: %v", path, id, err)
continue
}
}
return nil
}
func (c *containerLogManager) rotateLog(id, log string) error {
// pattern is used to match all rotated files.
pattern := fmt.Sprintf("%s.*", log)
logs, err := filepath.Glob(pattern)
if err != nil {
return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
}
logs, err = c.cleanupUnusedLogs(logs)
if err != nil {
return fmt.Errorf("failed to cleanup logs: %v", err)
}
logs, err = c.removeExcessLogs(logs)
if err != nil {
return fmt.Errorf("failed to remove excess logs: %v", err)
}
// Compress uncompressed log files.
for _, l := range logs {
if strings.HasSuffix(l, compressSuffix) {
continue
}
if err := c.compressLog(l); err != nil {
return fmt.Errorf("failed to compress log %q: %v", l, err)
}
}
if err := c.rotateLatestLog(id, log); err != nil {
return fmt.Errorf("failed to rotate log %q: %v", log, err)
}
return nil
}
// cleanupUnusedLogs cleans up temporary or unused log files generated by previous log rotation
// failure.
func (c *containerLogManager) cleanupUnusedLogs(logs []string) ([]string, error) {
inuse, unused := filterUnusedLogs(logs)
for _, l := range unused {
if err := os.Remove(l); err != nil {
return nil, fmt.Errorf("failed to remove unused log %q: %v", l, err)
}
}
return inuse, nil
}
// filterUnusedLogs splits logs into 2 groups, the 1st group is in used logs,
// the second group is unused logs.
func filterUnusedLogs(logs []string) (inuse []string, unused []string) {
for _, l := range logs {
if isInUse(l, logs) {
inuse = append(inuse, l)
} else {
unused = append(unused, l)
}
}
return inuse, unused
}
// isInUse checks whether a container log file is still inuse.
func isInUse(l string, logs []string) bool {
// All temporary files are not in use.
if strings.HasSuffix(l, tmpSuffix) {
return false
}
// All compresed logs are in use.
if strings.HasSuffix(l, compressSuffix) {
return true
}
// Files has already been compressed are not in use.
for _, another := range logs {
if l+compressSuffix == another {
return false
}
}
return true
}
// removeExcessLogs removes old logs to make sure there are only at most MaxFiles log files.
func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error) {
// Sort log files in oldest to newest order.
sort.Strings(logs)
// Container will create a new log file, and we'll rotate the latest log file.
// Other than those 2 files, we can have at most MaxFiles-2 rotated log files.
// Keep MaxFiles-2 files by removing old files.
// We should remove from oldest to newest, so as not to break ongoing `kubectl logs`.
maxRotatedFiles := c.policy.MaxFiles - 2
if maxRotatedFiles < 0 {
maxRotatedFiles = 0
}
i := 0
for ; i < len(logs)-maxRotatedFiles; i++ {
if err := os.Remove(logs[i]); err != nil {
return nil, fmt.Errorf("failed to remove old log %q: %v", logs[i], err)
}
}
logs = logs[i:]
return logs, nil
}
// compressLog compresses a log to log.gz with gzip.
func (c *containerLogManager) compressLog(log string) error {
r, err := os.Open(log)
if err != nil {
return fmt.Errorf("failed to open log %q: %v", log, err)
}
defer r.Close()
tmpLog := log + tmpSuffix
f, err := os.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("failed to create temporary log %q: %v", tmpLog, err)
}
defer func() {
// Best effort cleanup of tmpLog.
os.Remove(tmpLog)
}()
defer f.Close()
w := gzip.NewWriter(f)
defer w.Close()
if _, err := io.Copy(w, r); err != nil {
return fmt.Errorf("failed to compress %q to %q: %v", log, tmpLog, err)
}
compressedLog := log + compressSuffix
if err := os.Rename(tmpLog, compressedLog); err != nil {
return fmt.Errorf("failed to rename %q to %q: %v", tmpLog, compressedLog, err)
}
// Remove old log file.
if err := os.Remove(log); err != nil {
return fmt.Errorf("failed to remove log %q after compress: %v", log, err)
}
return nil
}
// rotateLatestLog rotates latest log without compression, so that container can still write
// and fluentd can finish reading.
func (c *containerLogManager) rotateLatestLog(id, log string) error {
timestamp := c.clock.Now().Format(timestampFormat)
rotated := fmt.Sprintf("%s.%s", log, timestamp)
if err := os.Rename(log, rotated); err != nil {
return fmt.Errorf("failed to rotate log %q to %q: %v", log, rotated, err)
}
if err := c.runtimeService.ReopenContainerLog(id); err != nil {
// Rename the rotated log back, so that we can try rotating it again
// next round.
// If kubelet gets restarted at this point, we'll lose original log.
if renameErr := os.Rename(rotated, log); renameErr != nil {
// This shouldn't happen.
// Report an error if this happens, because we will lose original
// log.
glog.Errorf("Failed to rename rotated log %q back to %q: %v, reopen container log error: %v", rotated, log, renameErr, err)
}
return fmt.Errorf("failed to reopen container log %q: %v", id, err)
}
return nil
}

View File

@ -0,0 +1,26 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logs
type containerLogManagerStub struct{}
func (*containerLogManagerStub) Start() {}
// NewStubContainerLogManager returns an empty ContainerLogManager which does nothing.
func NewStubContainerLogManager() ContainerLogManager {
return &containerLogManagerStub{}
}

View File

@ -0,0 +1,324 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logs
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/clock"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
critest "k8s.io/kubernetes/pkg/kubelet/apis/cri/testing"
)
func TestGetAllLogs(t *testing.T) {
dir, err := ioutil.TempDir("", "test-get-all-logs")
require.NoError(t, err)
defer os.RemoveAll(dir)
testLogs := []string{
"test-log.11111111-111111.gz",
"test-log",
"test-log.00000000-000000.gz",
"test-log.19900322-000000.gz",
"test-log.19900322-111111.gz",
"test-log.19880620-000000", // unused log
"test-log.19880620-000000.gz",
"test-log.19880620-111111.gz",
"test-log.20180101-000000",
"test-log.20180101-000000.tmp", // temporary log
}
expectLogs := []string{
"test-log.00000000-000000.gz",
"test-log.11111111-111111.gz",
"test-log.19880620-000000.gz",
"test-log.19880620-111111.gz",
"test-log.19900322-000000.gz",
"test-log.19900322-111111.gz",
"test-log.20180101-000000",
"test-log",
}
for i := range testLogs {
f, err := os.Create(filepath.Join(dir, testLogs[i]))
require.NoError(t, err)
f.Close()
}
got, err := GetAllLogs(filepath.Join(dir, "test-log"))
assert.NoError(t, err)
for i := range expectLogs {
expectLogs[i] = filepath.Join(dir, expectLogs[i])
}
assert.Equal(t, expectLogs, got)
}
func TestRotateLogs(t *testing.T) {
dir, err := ioutil.TempDir("", "test-rotate-logs")
require.NoError(t, err)
defer os.RemoveAll(dir)
const (
testMaxFiles = 3
testMaxSize = 10
)
now := time.Now()
f := critest.NewFakeRuntimeService()
c := &containerLogManager{
runtimeService: f,
policy: LogRotatePolicy{
MaxSize: testMaxSize,
MaxFiles: testMaxFiles,
},
clock: clock.NewFakeClock(now),
}
testLogs := []string{
"test-log-1",
"test-log-2",
"test-log-3",
"test-log-4",
"test-log-3.00000000-000001",
"test-log-3.00000000-000000.gz",
}
testContent := []string{
"short",
"longer than 10 bytes",
"longer than 10 bytes",
"longer than 10 bytes",
"the length doesn't matter",
"the length doesn't matter",
}
for i := range testLogs {
f, err := os.Create(filepath.Join(dir, testLogs[i]))
require.NoError(t, err)
_, err = f.Write([]byte(testContent[i]))
require.NoError(t, err)
f.Close()
}
testContainers := []*critest.FakeContainer{
{
ContainerStatus: runtimeapi.ContainerStatus{
Id: "container-not-need-rotate",
State: runtimeapi.ContainerState_CONTAINER_RUNNING,
LogPath: filepath.Join(dir, testLogs[0]),
},
},
{
ContainerStatus: runtimeapi.ContainerStatus{
Id: "container-need-rotate",
State: runtimeapi.ContainerState_CONTAINER_RUNNING,
LogPath: filepath.Join(dir, testLogs[1]),
},
},
{
ContainerStatus: runtimeapi.ContainerStatus{
Id: "container-has-excess-log",
State: runtimeapi.ContainerState_CONTAINER_RUNNING,
LogPath: filepath.Join(dir, testLogs[2]),
},
},
{
ContainerStatus: runtimeapi.ContainerStatus{
Id: "container-is-not-running",
State: runtimeapi.ContainerState_CONTAINER_EXITED,
LogPath: filepath.Join(dir, testLogs[3]),
},
},
}
f.SetFakeContainers(testContainers)
require.NoError(t, c.rotateLogs())
timestamp := now.Format(timestampFormat)
logs, err := ioutil.ReadDir(dir)
require.NoError(t, err)
assert.Len(t, logs, 5)
assert.Equal(t, testLogs[0], logs[0].Name())
assert.Equal(t, testLogs[1]+"."+timestamp, logs[1].Name())
assert.Equal(t, testLogs[4]+compressSuffix, logs[2].Name())
assert.Equal(t, testLogs[2]+"."+timestamp, logs[3].Name())
assert.Equal(t, testLogs[3], logs[4].Name())
}
func TestCleanupUnusedLog(t *testing.T) {
dir, err := ioutil.TempDir("", "test-cleanup-unused-log")
require.NoError(t, err)
defer os.RemoveAll(dir)
testLogs := []string{
"test-log-1", // regular log
"test-log-1.tmp", // temporary log
"test-log-2", // unused log
"test-log-2.gz", // compressed log
}
for i := range testLogs {
testLogs[i] = filepath.Join(dir, testLogs[i])
f, err := os.Create(testLogs[i])
require.NoError(t, err)
f.Close()
}
c := &containerLogManager{}
got, err := c.cleanupUnusedLogs(testLogs)
require.NoError(t, err)
assert.Len(t, got, 2)
assert.Equal(t, []string{testLogs[0], testLogs[3]}, got)
logs, err := ioutil.ReadDir(dir)
require.NoError(t, err)
assert.Len(t, logs, 2)
assert.Equal(t, testLogs[0], filepath.Join(dir, logs[0].Name()))
assert.Equal(t, testLogs[3], filepath.Join(dir, logs[1].Name()))
}
func TestRemoveExcessLog(t *testing.T) {
for desc, test := range map[string]struct {
max int
expect []string
}{
"MaxFiles equal to 2": {
max: 2,
expect: []string{},
},
"MaxFiles more than 2": {
max: 3,
expect: []string{"test-log-4"},
},
"MaxFiles more than log file number": {
max: 6,
expect: []string{"test-log-1", "test-log-2", "test-log-3", "test-log-4"},
},
} {
t.Logf("TestCase %q", desc)
dir, err := ioutil.TempDir("", "test-remove-excess-log")
require.NoError(t, err)
defer os.RemoveAll(dir)
testLogs := []string{"test-log-3", "test-log-1", "test-log-2", "test-log-4"}
for i := range testLogs {
testLogs[i] = filepath.Join(dir, testLogs[i])
f, err := os.Create(testLogs[i])
require.NoError(t, err)
f.Close()
}
c := &containerLogManager{policy: LogRotatePolicy{MaxFiles: test.max}}
got, err := c.removeExcessLogs(testLogs)
require.NoError(t, err)
require.Len(t, got, len(test.expect))
for i, name := range test.expect {
assert.Equal(t, name, filepath.Base(got[i]))
}
logs, err := ioutil.ReadDir(dir)
require.NoError(t, err)
require.Len(t, logs, len(test.expect))
for i, name := range test.expect {
assert.Equal(t, name, logs[i].Name())
}
}
}
func TestCompressLog(t *testing.T) {
dir, err := ioutil.TempDir("", "test-compress-log")
require.NoError(t, err)
defer os.RemoveAll(dir)
testFile, err := ioutil.TempFile(dir, "test-rotate-latest-log")
require.NoError(t, err)
defer testFile.Close()
testContent := "test log content"
_, err = testFile.Write([]byte(testContent))
require.NoError(t, err)
testLog := testFile.Name()
c := &containerLogManager{}
require.NoError(t, c.compressLog(testLog))
_, err = os.Stat(testLog + compressSuffix)
assert.NoError(t, err, "log should be compressed")
_, err = os.Stat(testLog + tmpSuffix)
assert.Error(t, err, "temporary log should be renamed")
_, err = os.Stat(testLog)
assert.Error(t, err, "original log should be removed")
rc, err := UncompressLog(testLog + compressSuffix)
require.NoError(t, err)
defer rc.Close()
var buf bytes.Buffer
_, err = io.Copy(&buf, rc)
require.NoError(t, err)
assert.Equal(t, testContent, buf.String())
}
func TestRotateLatestLog(t *testing.T) {
dir, err := ioutil.TempDir("", "test-rotate-latest-log")
require.NoError(t, err)
defer os.RemoveAll(dir)
for desc, test := range map[string]struct {
runtimeError error
maxFiles int
expectError bool
expectOriginal bool
expectRotated bool
}{
"should successfully rotate log when MaxFiles is 2": {
maxFiles: 2,
expectError: false,
expectOriginal: false,
expectRotated: true,
},
"should restore original log when ReopenContainerLog fails": {
runtimeError: fmt.Errorf("random error"),
maxFiles: 2,
expectError: true,
expectOriginal: true,
expectRotated: false,
},
} {
t.Logf("TestCase %q", desc)
now := time.Now()
f := critest.NewFakeRuntimeService()
c := &containerLogManager{
runtimeService: f,
policy: LogRotatePolicy{MaxFiles: test.maxFiles},
clock: clock.NewFakeClock(now),
}
if test.runtimeError != nil {
f.InjectError("ReopenContainerLog", test.runtimeError)
}
testFile, err := ioutil.TempFile(dir, "test-rotate-latest-log")
require.NoError(t, err)
defer testFile.Close()
testLog := testFile.Name()
rotatedLog := fmt.Sprintf("%s.%s", testLog, now.Format(timestampFormat))
err = c.rotateLatestLog("test-id", testLog)
assert.Equal(t, test.expectError, err != nil)
_, err = os.Stat(testLog)
assert.Equal(t, test.expectOriginal, err == nil)
_, err = os.Stat(rotatedLog)
assert.Equal(t, test.expectRotated, err == nil)
assert.NoError(t, f.AssertCalls([]string{"ReopenContainerLog"}))
}
}

View File

@ -84,6 +84,7 @@ go_test(
name = "go_default_test",
srcs = [
"apparmor_test.go",
"container_log_rotation_test.go",
"cpu_manager_test.go",
"critical_pod_test.go",
"docker_test.go",
@ -132,6 +133,7 @@ go_test(
"//pkg/kubelet/images:go_default_library",
"//pkg/kubelet/kubeletconfig:go_default_library",
"//pkg/kubelet/kubeletconfig/status:go_default_library",
"//pkg/kubelet/logs:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/security/apparmor:go_default_library",

View File

@ -0,0 +1,107 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_node
import (
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubelogs "k8s.io/kubernetes/pkg/kubelet/logs"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
const (
testContainerLogMaxFiles = 3
testContainerLogMaxSize = "40Ki"
rotationPollInterval = 5 * time.Second
rotationEventuallyTimeout = 3 * time.Minute
rotationConsistentlyTimeout = 2 * time.Minute
)
var _ = framework.KubeDescribe("ContainerLogRotation [Slow] [Serial] [Disruptive]", func() {
f := framework.NewDefaultFramework("container-log-rotation-test")
Context("when a container generates a lot of log", func() {
BeforeEach(func() {
if framework.TestContext.ContainerRuntime != kubetypes.RemoteContainerRuntime {
framework.Skipf("Skipping ContainerLogRotation test since the container runtime is not remote")
}
})
tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) {
initialConfig.FeatureGates[string(features.CRIContainerLogRotation)] = true
initialConfig.ContainerLogMaxFiles = testContainerLogMaxFiles
initialConfig.ContainerLogMaxSize = testContainerLogMaxSize
})
It("should be rotated and limited to a fixed amount of files", func() {
By("create log container")
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-container-log-rotation",
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Name: "log-container",
Image: busyboxImage,
Command: []string{
"sh",
"-c",
// ~12Kb/s. Exceeding 40Kb in 4 seconds. Log rotation period is 10 seconds.
"while true; do echo hello world; sleep 0.001; done;",
},
},
},
},
}
pod = f.PodClient().CreateSync(pod)
By("get container log path")
Expect(len(pod.Status.ContainerStatuses)).To(Equal(1))
id := kubecontainer.ParseContainerID(pod.Status.ContainerStatuses[0].ContainerID).ID
r, _, err := getCRIClient()
Expect(err).NotTo(HaveOccurred())
status, err := r.ContainerStatus(id)
Expect(err).NotTo(HaveOccurred())
logPath := status.GetLogPath()
By("wait for container log being rotated to max file limit")
Eventually(func() (int, error) {
logs, err := kubelogs.GetAllLogs(logPath)
if err != nil {
return 0, err
}
return len(logs), nil
}, rotationEventuallyTimeout, rotationPollInterval).Should(Equal(testContainerLogMaxFiles), "should eventually rotate to max file limit")
By("make sure container log number won't exceed max file limit")
Consistently(func() (int, error) {
logs, err := kubelogs.GetAllLogs(logPath)
if err != nil {
return 0, err
}
return len(logs), nil
}, rotationConsistentlyTimeout, rotationPollInterval).Should(BeNumerically("<=", testContainerLogMaxFiles), "should never exceed max file limit")
})
})
})