mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Fix an issue when rotated logs of dead containers are not removed.
This commit is contained in:
parent
ff33efc164
commit
5d1b3e26af
@ -38,6 +38,9 @@ type OSInterface interface {
|
|||||||
Pipe() (r *os.File, w *os.File, err error)
|
Pipe() (r *os.File, w *os.File, err error)
|
||||||
ReadDir(dirname string) ([]os.FileInfo, error)
|
ReadDir(dirname string) ([]os.FileInfo, error)
|
||||||
Glob(pattern string) ([]string, error)
|
Glob(pattern string) ([]string, error)
|
||||||
|
Open(name string) (*os.File, error)
|
||||||
|
OpenFile(name string, flag int, perm os.FileMode) (*os.File, error)
|
||||||
|
Rename(oldpath, newpath string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// RealOS is used to dispatch the real system level operations.
|
// RealOS is used to dispatch the real system level operations.
|
||||||
@ -105,3 +108,18 @@ func (RealOS) ReadDir(dirname string) ([]os.FileInfo, error) {
|
|||||||
func (RealOS) Glob(pattern string) ([]string, error) {
|
func (RealOS) Glob(pattern string) ([]string, error) {
|
||||||
return filepath.Glob(pattern)
|
return filepath.Glob(pattern)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Open will call os.Open to return the file.
|
||||||
|
func (RealOS) Open(name string) (*os.File, error) {
|
||||||
|
return os.Open(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OpenFile will call os.OpenFile to return the file.
|
||||||
|
func (RealOS) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
|
||||||
|
return os.OpenFile(name, flag, perm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rename will call os.Rename to rename a file.
|
||||||
|
func (RealOS) Rename(oldpath, newpath string) error {
|
||||||
|
return os.Rename(oldpath, newpath)
|
||||||
|
}
|
||||||
|
@ -30,6 +30,7 @@ type FakeOS struct {
|
|||||||
ReadDirFn func(string) ([]os.FileInfo, error)
|
ReadDirFn func(string) ([]os.FileInfo, error)
|
||||||
MkdirAllFn func(string, os.FileMode) error
|
MkdirAllFn func(string, os.FileMode) error
|
||||||
SymlinkFn func(string, string) error
|
SymlinkFn func(string, string) error
|
||||||
|
GlobFn func(string, string) bool
|
||||||
HostName string
|
HostName string
|
||||||
Removes []string
|
Removes []string
|
||||||
Files map[string][]*os.FileInfo
|
Files map[string][]*os.FileInfo
|
||||||
@ -78,8 +79,12 @@ func (f *FakeOS) RemoveAll(path string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create is a fake call that returns nil.
|
// Create is a fake call that creates a virtual file and returns nil.
|
||||||
func (FakeOS) Create(path string) (*os.File, error) {
|
func (f *FakeOS) Create(path string) (*os.File, error) {
|
||||||
|
if f.Files == nil {
|
||||||
|
f.Files = make(map[string][]*os.FileInfo)
|
||||||
|
}
|
||||||
|
f.Files[path] = []*os.FileInfo{}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -111,7 +116,31 @@ func (f *FakeOS) ReadDir(dirname string) ([]os.FileInfo, error) {
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Glob is a fake call that returns nil.
|
// Glob is a fake call that returns list of virtual files matching a pattern.
|
||||||
func (f *FakeOS) Glob(pattern string) ([]string, error) {
|
func (f *FakeOS) Glob(pattern string) ([]string, error) {
|
||||||
|
if f.GlobFn != nil {
|
||||||
|
var res []string
|
||||||
|
for k := range f.Files {
|
||||||
|
if f.GlobFn(pattern, k) {
|
||||||
|
res = append(res, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Open is a fake call that returns nil.
|
||||||
|
func (FakeOS) Open(name string) (*os.File, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// OpenFile is a fake call that return nil.
|
||||||
|
func (FakeOS) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rename is a fake call that return nil.
|
||||||
|
func (FakeOS) Rename(oldpath, newpath string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -585,6 +585,22 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
|
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) {
|
||||||
|
// setup containerLogManager for CRI container runtime
|
||||||
|
containerLogManager, err := logs.NewContainerLogManager(
|
||||||
|
klet.runtimeService,
|
||||||
|
kubeDeps.OSInterface,
|
||||||
|
kubeCfg.ContainerLogMaxSize,
|
||||||
|
int(kubeCfg.ContainerLogMaxFiles),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
|
||||||
|
}
|
||||||
|
klet.containerLogManager = containerLogManager
|
||||||
|
} else {
|
||||||
|
klet.containerLogManager = logs.NewStubContainerLogManager()
|
||||||
|
}
|
||||||
|
|
||||||
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
|
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
|
||||||
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
|
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
|
||||||
klet.livenessManager,
|
klet.livenessManager,
|
||||||
@ -605,6 +621,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
kubeDeps.RemoteImageService,
|
kubeDeps.RemoteImageService,
|
||||||
kubeDeps.ContainerManager.InternalContainerLifecycle(),
|
kubeDeps.ContainerManager.InternalContainerLifecycle(),
|
||||||
kubeDeps.dockerLegacyService,
|
kubeDeps.dockerLegacyService,
|
||||||
|
klet.containerLogManager,
|
||||||
klet.runtimeClassManager,
|
klet.runtimeClassManager,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -662,21 +679,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
}
|
}
|
||||||
klet.imageManager = imageManager
|
klet.imageManager = imageManager
|
||||||
|
|
||||||
if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) {
|
|
||||||
// setup containerLogManager for CRI container runtime
|
|
||||||
containerLogManager, err := logs.NewContainerLogManager(
|
|
||||||
klet.runtimeService,
|
|
||||||
kubeCfg.ContainerLogMaxSize,
|
|
||||||
int(kubeCfg.ContainerLogMaxFiles),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
|
|
||||||
}
|
|
||||||
klet.containerLogManager = containerLogManager
|
|
||||||
} else {
|
|
||||||
klet.containerLogManager = logs.NewStubContainerLogManager()
|
|
||||||
}
|
|
||||||
|
|
||||||
if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
|
if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
|
||||||
klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory)
|
klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -41,6 +41,7 @@ go_library(
|
|||||||
"//pkg/kubelet/images:go_default_library",
|
"//pkg/kubelet/images:go_default_library",
|
||||||
"//pkg/kubelet/kuberuntime/logs:go_default_library",
|
"//pkg/kubelet/kuberuntime/logs:go_default_library",
|
||||||
"//pkg/kubelet/lifecycle:go_default_library",
|
"//pkg/kubelet/lifecycle:go_default_library",
|
||||||
|
"//pkg/kubelet/logs:go_default_library",
|
||||||
"//pkg/kubelet/metrics:go_default_library",
|
"//pkg/kubelet/metrics:go_default_library",
|
||||||
"//pkg/kubelet/prober/results:go_default_library",
|
"//pkg/kubelet/prober/results:go_default_library",
|
||||||
"//pkg/kubelet/runtimeclass:go_default_library",
|
"//pkg/kubelet/runtimeclass:go_default_library",
|
||||||
|
@ -32,6 +32,7 @@ import (
|
|||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/images"
|
"k8s.io/kubernetes/pkg/kubelet/images"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/logs"
|
||||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -73,6 +74,10 @@ func (f *fakePodStateProvider) IsPodTerminated(uid types.UID) bool {
|
|||||||
|
|
||||||
func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring) (*kubeGenericRuntimeManager, error) {
|
func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring) (*kubeGenericRuntimeManager, error) {
|
||||||
recorder := &record.FakeRecorder{}
|
recorder := &record.FakeRecorder{}
|
||||||
|
logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
kubeRuntimeManager := &kubeGenericRuntimeManager{
|
kubeRuntimeManager := &kubeGenericRuntimeManager{
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
cpuCFSQuota: false,
|
cpuCFSQuota: false,
|
||||||
@ -88,6 +93,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
|
|||||||
seccompProfileRoot: fakeSeccompProfileRoot,
|
seccompProfileRoot: fakeSeccompProfileRoot,
|
||||||
internalLifecycle: cm.NewFakeInternalContainerLifecycle(),
|
internalLifecycle: cm.NewFakeInternalContainerLifecycle(),
|
||||||
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
|
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
|
||||||
|
logManager: logManager,
|
||||||
}
|
}
|
||||||
|
|
||||||
typedVersion, err := runtimeService.Version(kubeRuntimeAPIVersion)
|
typedVersion, err := runtimeService.Version(kubeRuntimeAPIVersion)
|
||||||
|
@ -855,19 +855,19 @@ func (m *kubeGenericRuntimeManager) removeContainer(containerID string) error {
|
|||||||
|
|
||||||
// removeContainerLog removes the container log.
|
// removeContainerLog removes the container log.
|
||||||
func (m *kubeGenericRuntimeManager) removeContainerLog(containerID string) error {
|
func (m *kubeGenericRuntimeManager) removeContainerLog(containerID string) error {
|
||||||
// Remove the container log.
|
// Use log manager to remove rotated logs.
|
||||||
|
err := m.logManager.Clean(containerID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
status, err := m.runtimeService.ContainerStatus(containerID)
|
status, err := m.runtimeService.ContainerStatus(containerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get container status %q: %v", containerID, err)
|
return fmt.Errorf("failed to get container status %q: %v", containerID, err)
|
||||||
}
|
}
|
||||||
labeledInfo := getContainerInfoFromLabels(status.Labels)
|
|
||||||
path := status.GetLogPath()
|
|
||||||
if err := m.osInterface.Remove(path); err != nil && !os.IsNotExist(err) {
|
|
||||||
return fmt.Errorf("failed to remove container %q log %q: %v", containerID, path, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove the legacy container log symlink.
|
// Remove the legacy container log symlink.
|
||||||
// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
|
// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
|
||||||
|
labeledInfo := getContainerInfoFromLabels(status.Labels)
|
||||||
legacySymlink := legacyLogSymlink(containerID, labeledInfo.ContainerName, labeledInfo.PodName,
|
legacySymlink := legacyLogSymlink(containerID, labeledInfo.ContainerName, labeledInfo.PodName,
|
||||||
labeledInfo.PodNamespace)
|
labeledInfo.PodNamespace)
|
||||||
if err := m.osInterface.Remove(legacySymlink); err != nil && !os.IsNotExist(err) {
|
if err := m.osInterface.Remove(legacySymlink); err != nil && !os.IsNotExist(err) {
|
||||||
|
@ -19,6 +19,7 @@ package kuberuntime
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -65,12 +66,22 @@ func TestRemoveContainer(t *testing.T) {
|
|||||||
|
|
||||||
containerID := fakeContainers[0].Id
|
containerID := fakeContainers[0].Id
|
||||||
fakeOS := m.osInterface.(*containertest.FakeOS)
|
fakeOS := m.osInterface.(*containertest.FakeOS)
|
||||||
|
fakeOS.GlobFn = func(pattern, path string) bool {
|
||||||
|
pattern = strings.Replace(pattern, "*", ".*", -1)
|
||||||
|
return regexp.MustCompile(pattern).MatchString(path)
|
||||||
|
}
|
||||||
|
expectedContainerLogPath := filepath.Join(podLogsRootDirectory, "new_bar_12345678", "foo", "0.log")
|
||||||
|
expectedContainerLogPathRotated := filepath.Join(podLogsRootDirectory, "new_bar_12345678", "foo", "0.log.20060102-150405")
|
||||||
|
expectedContainerLogSymlink := legacyLogSymlink(containerID, "foo", "bar", "new")
|
||||||
|
|
||||||
|
fakeOS.Create(expectedContainerLogPath)
|
||||||
|
fakeOS.Create(expectedContainerLogPathRotated)
|
||||||
|
|
||||||
err = m.removeContainer(containerID)
|
err = m.removeContainer(containerID)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
// Verify container log is removed
|
// Verify container log is removed
|
||||||
expectedContainerLogPath := filepath.Join(podLogsRootDirectory, "new_bar_12345678", "foo", "0.log")
|
|
||||||
expectedContainerLogSymlink := legacyLogSymlink(containerID, "foo", "bar", "new")
|
assert.Equal(t, []string{expectedContainerLogPath, expectedContainerLogPathRotated, expectedContainerLogSymlink}, fakeOS.Removes)
|
||||||
assert.Equal(t, fakeOS.Removes, []string{expectedContainerLogPath, expectedContainerLogSymlink})
|
|
||||||
// Verify container is removed
|
// Verify container is removed
|
||||||
assert.Contains(t, fakeRuntime.Called, "RemoveContainer")
|
assert.Contains(t, fakeRuntime.Called, "RemoveContainer")
|
||||||
containers, err := fakeRuntime.ListContainers(&runtimeapi.ContainerFilter{Id: containerID})
|
containers, err := fakeRuntime.ListContainers(&runtimeapi.ContainerFilter{Id: containerID})
|
||||||
|
@ -46,6 +46,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubelet/events"
|
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/images"
|
"k8s.io/kubernetes/pkg/kubelet/images"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/logs"
|
||||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
|
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/types"
|
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
@ -127,6 +128,9 @@ type kubeGenericRuntimeManager struct {
|
|||||||
// A shim to legacy functions for backward compatibility.
|
// A shim to legacy functions for backward compatibility.
|
||||||
legacyLogProvider LegacyLogProvider
|
legacyLogProvider LegacyLogProvider
|
||||||
|
|
||||||
|
// Manage container logs.
|
||||||
|
logManager logs.ContainerLogManager
|
||||||
|
|
||||||
// Manage RuntimeClass resources.
|
// Manage RuntimeClass resources.
|
||||||
runtimeClassManager *runtimeclass.Manager
|
runtimeClassManager *runtimeclass.Manager
|
||||||
|
|
||||||
@ -168,6 +172,7 @@ func NewKubeGenericRuntimeManager(
|
|||||||
imageService internalapi.ImageManagerService,
|
imageService internalapi.ImageManagerService,
|
||||||
internalLifecycle cm.InternalContainerLifecycle,
|
internalLifecycle cm.InternalContainerLifecycle,
|
||||||
legacyLogProvider LegacyLogProvider,
|
legacyLogProvider LegacyLogProvider,
|
||||||
|
logManager logs.ContainerLogManager,
|
||||||
runtimeClassManager *runtimeclass.Manager,
|
runtimeClassManager *runtimeclass.Manager,
|
||||||
) (KubeGenericRuntime, error) {
|
) (KubeGenericRuntime, error) {
|
||||||
kubeRuntimeManager := &kubeGenericRuntimeManager{
|
kubeRuntimeManager := &kubeGenericRuntimeManager{
|
||||||
@ -185,6 +190,7 @@ func NewKubeGenericRuntimeManager(
|
|||||||
keyring: credentialprovider.NewDockerKeyring(),
|
keyring: credentialprovider.NewDockerKeyring(),
|
||||||
internalLifecycle: internalLifecycle,
|
internalLifecycle: internalLifecycle,
|
||||||
legacyLogProvider: legacyLogProvider,
|
legacyLogProvider: legacyLogProvider,
|
||||||
|
logManager: logManager,
|
||||||
runtimeClassManager: runtimeClassManager,
|
runtimeClassManager: runtimeClassManager,
|
||||||
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
|
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ go_library(
|
|||||||
importpath = "k8s.io/kubernetes/pkg/kubelet/logs",
|
importpath = "k8s.io/kubernetes/pkg/kubelet/logs",
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
deps = [
|
deps = [
|
||||||
|
"//pkg/kubelet/container:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
@ -23,6 +24,7 @@ go_test(
|
|||||||
srcs = ["container_log_manager_test.go"],
|
srcs = ["container_log_manager_test.go"],
|
||||||
embed = [":go_default_library"],
|
embed = [":go_default_library"],
|
||||||
deps = [
|
deps = [
|
||||||
|
"//pkg/kubelet/container:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",
|
"//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",
|
||||||
"//staging/src/k8s.io/cri-api/pkg/apis/testing:go_default_library",
|
"//staging/src/k8s.io/cri-api/pkg/apis/testing:go_default_library",
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
@ -33,6 +34,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
internalapi "k8s.io/cri-api/pkg/apis"
|
internalapi "k8s.io/cri-api/pkg/apis"
|
||||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||||
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -55,6 +57,8 @@ type ContainerLogManager interface {
|
|||||||
// TODO(random-liu): Add RotateLogs function and call it under disk pressure.
|
// TODO(random-liu): Add RotateLogs function and call it under disk pressure.
|
||||||
// Start container log manager.
|
// Start container log manager.
|
||||||
Start()
|
Start()
|
||||||
|
// Clean removes all logs of specified container.
|
||||||
|
Clean(containerID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// LogRotatePolicy is a policy for container log rotation. The policy applies to all
|
// LogRotatePolicy is a policy for container log rotation. The policy applies to all
|
||||||
@ -142,12 +146,14 @@ func parseMaxSize(size string) (int64, error) {
|
|||||||
|
|
||||||
type containerLogManager struct {
|
type containerLogManager struct {
|
||||||
runtimeService internalapi.RuntimeService
|
runtimeService internalapi.RuntimeService
|
||||||
|
osInterface kubecontainer.OSInterface
|
||||||
policy LogRotatePolicy
|
policy LogRotatePolicy
|
||||||
clock clock.Clock
|
clock clock.Clock
|
||||||
|
mutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewContainerLogManager creates a new container log manager.
|
// NewContainerLogManager creates a new container log manager.
|
||||||
func NewContainerLogManager(runtimeService internalapi.RuntimeService, maxSize string, maxFiles int) (ContainerLogManager, error) {
|
func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterface kubecontainer.OSInterface, maxSize string, maxFiles int) (ContainerLogManager, error) {
|
||||||
if maxFiles <= 1 {
|
if maxFiles <= 1 {
|
||||||
return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles)
|
return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles)
|
||||||
}
|
}
|
||||||
@ -157,12 +163,14 @@ func NewContainerLogManager(runtimeService internalapi.RuntimeService, maxSize s
|
|||||||
}
|
}
|
||||||
// policy LogRotatePolicy
|
// policy LogRotatePolicy
|
||||||
return &containerLogManager{
|
return &containerLogManager{
|
||||||
|
osInterface: osInterface,
|
||||||
runtimeService: runtimeService,
|
runtimeService: runtimeService,
|
||||||
policy: LogRotatePolicy{
|
policy: LogRotatePolicy{
|
||||||
MaxSize: parsedMaxSize,
|
MaxSize: parsedMaxSize,
|
||||||
MaxFiles: maxFiles,
|
MaxFiles: maxFiles,
|
||||||
},
|
},
|
||||||
clock: clock.RealClock{},
|
clock: clock.RealClock{},
|
||||||
|
mutex: sync.Mutex{},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -176,7 +184,32 @@ func (c *containerLogManager) Start() {
|
|||||||
}, logMonitorPeriod)
|
}, logMonitorPeriod)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean removes all logs of specified container (including rotated one).
|
||||||
|
func (c *containerLogManager) Clean(containerID string) error {
|
||||||
|
c.mutex.Lock()
|
||||||
|
defer c.mutex.Unlock()
|
||||||
|
status, err := c.runtimeService.ContainerStatus(containerID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get container status %q: %v", containerID, err)
|
||||||
|
}
|
||||||
|
pattern := fmt.Sprintf("%s*", status.GetLogPath())
|
||||||
|
logs, err := c.osInterface.Glob(pattern)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, l := range logs {
|
||||||
|
if err := c.osInterface.Remove(l); err != nil && !os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("failed to remove container %q log %q: %v", containerID, l, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *containerLogManager) rotateLogs() error {
|
func (c *containerLogManager) rotateLogs() error {
|
||||||
|
c.mutex.Lock()
|
||||||
|
defer c.mutex.Unlock()
|
||||||
// TODO(#59998): Use kubelet pod cache.
|
// TODO(#59998): Use kubelet pod cache.
|
||||||
containers, err := c.runtimeService.ListContainers(&runtimeapi.ContainerFilter{})
|
containers, err := c.runtimeService.ListContainers(&runtimeapi.ContainerFilter{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -197,7 +230,7 @@ func (c *containerLogManager) rotateLogs() error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
path := status.GetLogPath()
|
path := status.GetLogPath()
|
||||||
info, err := os.Stat(path)
|
info, err := c.osInterface.Stat(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !os.IsNotExist(err) {
|
if !os.IsNotExist(err) {
|
||||||
klog.Errorf("Failed to stat container log %q: %v", path, err)
|
klog.Errorf("Failed to stat container log %q: %v", path, err)
|
||||||
@ -211,7 +244,7 @@ func (c *containerLogManager) rotateLogs() error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// The container log should be recovered.
|
// The container log should be recovered.
|
||||||
info, err = os.Stat(path)
|
info, err = c.osInterface.Stat(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to stat container log %q after reopen: %v", path, err)
|
klog.Errorf("Failed to stat container log %q after reopen: %v", path, err)
|
||||||
continue
|
continue
|
||||||
@ -269,7 +302,7 @@ func (c *containerLogManager) rotateLog(id, log string) error {
|
|||||||
func (c *containerLogManager) cleanupUnusedLogs(logs []string) ([]string, error) {
|
func (c *containerLogManager) cleanupUnusedLogs(logs []string) ([]string, error) {
|
||||||
inuse, unused := filterUnusedLogs(logs)
|
inuse, unused := filterUnusedLogs(logs)
|
||||||
for _, l := range unused {
|
for _, l := range unused {
|
||||||
if err := os.Remove(l); err != nil {
|
if err := c.osInterface.Remove(l); err != nil {
|
||||||
return nil, fmt.Errorf("failed to remove unused log %q: %v", l, err)
|
return nil, fmt.Errorf("failed to remove unused log %q: %v", l, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -322,7 +355,7 @@ func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error)
|
|||||||
}
|
}
|
||||||
i := 0
|
i := 0
|
||||||
for ; i < len(logs)-maxRotatedFiles; i++ {
|
for ; i < len(logs)-maxRotatedFiles; i++ {
|
||||||
if err := os.Remove(logs[i]); err != nil {
|
if err := c.osInterface.Remove(logs[i]); err != nil {
|
||||||
return nil, fmt.Errorf("failed to remove old log %q: %v", logs[i], err)
|
return nil, fmt.Errorf("failed to remove old log %q: %v", logs[i], err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -332,19 +365,19 @@ func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error)
|
|||||||
|
|
||||||
// compressLog compresses a log to log.gz with gzip.
|
// compressLog compresses a log to log.gz with gzip.
|
||||||
func (c *containerLogManager) compressLog(log string) error {
|
func (c *containerLogManager) compressLog(log string) error {
|
||||||
r, err := os.Open(log)
|
r, err := c.osInterface.Open(log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open log %q: %v", log, err)
|
return fmt.Errorf("failed to open log %q: %v", log, err)
|
||||||
}
|
}
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
tmpLog := log + tmpSuffix
|
tmpLog := log + tmpSuffix
|
||||||
f, err := os.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
f, err := c.osInterface.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create temporary log %q: %v", tmpLog, err)
|
return fmt.Errorf("failed to create temporary log %q: %v", tmpLog, err)
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
// Best effort cleanup of tmpLog.
|
// Best effort cleanup of tmpLog.
|
||||||
os.Remove(tmpLog)
|
c.osInterface.Remove(tmpLog)
|
||||||
}()
|
}()
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
w := gzip.NewWriter(f)
|
w := gzip.NewWriter(f)
|
||||||
@ -353,11 +386,11 @@ func (c *containerLogManager) compressLog(log string) error {
|
|||||||
return fmt.Errorf("failed to compress %q to %q: %v", log, tmpLog, err)
|
return fmt.Errorf("failed to compress %q to %q: %v", log, tmpLog, err)
|
||||||
}
|
}
|
||||||
compressedLog := log + compressSuffix
|
compressedLog := log + compressSuffix
|
||||||
if err := os.Rename(tmpLog, compressedLog); err != nil {
|
if err := c.osInterface.Rename(tmpLog, compressedLog); err != nil {
|
||||||
return fmt.Errorf("failed to rename %q to %q: %v", tmpLog, compressedLog, err)
|
return fmt.Errorf("failed to rename %q to %q: %v", tmpLog, compressedLog, err)
|
||||||
}
|
}
|
||||||
// Remove old log file.
|
// Remove old log file.
|
||||||
if err := os.Remove(log); err != nil {
|
if err := c.osInterface.Remove(log); err != nil {
|
||||||
return fmt.Errorf("failed to remove log %q after compress: %v", log, err)
|
return fmt.Errorf("failed to remove log %q after compress: %v", log, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -368,14 +401,14 @@ func (c *containerLogManager) compressLog(log string) error {
|
|||||||
func (c *containerLogManager) rotateLatestLog(id, log string) error {
|
func (c *containerLogManager) rotateLatestLog(id, log string) error {
|
||||||
timestamp := c.clock.Now().Format(timestampFormat)
|
timestamp := c.clock.Now().Format(timestampFormat)
|
||||||
rotated := fmt.Sprintf("%s.%s", log, timestamp)
|
rotated := fmt.Sprintf("%s.%s", log, timestamp)
|
||||||
if err := os.Rename(log, rotated); err != nil {
|
if err := c.osInterface.Rename(log, rotated); err != nil {
|
||||||
return fmt.Errorf("failed to rotate log %q to %q: %v", log, rotated, err)
|
return fmt.Errorf("failed to rotate log %q to %q: %v", log, rotated, err)
|
||||||
}
|
}
|
||||||
if err := c.runtimeService.ReopenContainerLog(id); err != nil {
|
if err := c.runtimeService.ReopenContainerLog(id); err != nil {
|
||||||
// Rename the rotated log back, so that we can try rotating it again
|
// Rename the rotated log back, so that we can try rotating it again
|
||||||
// next round.
|
// next round.
|
||||||
// If kubelet gets restarted at this point, we'll lose original log.
|
// If kubelet gets restarted at this point, we'll lose original log.
|
||||||
if renameErr := os.Rename(rotated, log); renameErr != nil {
|
if renameErr := c.osInterface.Rename(rotated, log); renameErr != nil {
|
||||||
// This shouldn't happen.
|
// This shouldn't happen.
|
||||||
// Report an error if this happens, because we will lose original
|
// Report an error if this happens, because we will lose original
|
||||||
// log.
|
// log.
|
||||||
|
@ -20,6 +20,10 @@ type containerLogManagerStub struct{}
|
|||||||
|
|
||||||
func (*containerLogManagerStub) Start() {}
|
func (*containerLogManagerStub) Start() {}
|
||||||
|
|
||||||
|
func (*containerLogManagerStub) Clean(containerID string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// NewStubContainerLogManager returns an empty ContainerLogManager which does nothing.
|
// NewStubContainerLogManager returns an empty ContainerLogManager which does nothing.
|
||||||
func NewStubContainerLogManager() ContainerLogManager {
|
func NewStubContainerLogManager() ContainerLogManager {
|
||||||
return &containerLogManagerStub{}
|
return &containerLogManagerStub{}
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||||
@ -90,7 +91,8 @@ func TestRotateLogs(t *testing.T) {
|
|||||||
MaxSize: testMaxSize,
|
MaxSize: testMaxSize,
|
||||||
MaxFiles: testMaxFiles,
|
MaxFiles: testMaxFiles,
|
||||||
},
|
},
|
||||||
clock: clock.NewFakeClock(now),
|
osInterface: container.RealOS{},
|
||||||
|
clock: clock.NewFakeClock(now),
|
||||||
}
|
}
|
||||||
testLogs := []string{
|
testLogs := []string{
|
||||||
"test-log-1",
|
"test-log-1",
|
||||||
@ -159,6 +161,77 @@ func TestRotateLogs(t *testing.T) {
|
|||||||
assert.Equal(t, testLogs[3], logs[4].Name())
|
assert.Equal(t, testLogs[3], logs[4].Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClean(t *testing.T) {
|
||||||
|
dir, err := ioutil.TempDir("", "test-clean")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
const (
|
||||||
|
testMaxFiles = 3
|
||||||
|
testMaxSize = 10
|
||||||
|
)
|
||||||
|
now := time.Now()
|
||||||
|
f := critest.NewFakeRuntimeService()
|
||||||
|
c := &containerLogManager{
|
||||||
|
runtimeService: f,
|
||||||
|
policy: LogRotatePolicy{
|
||||||
|
MaxSize: testMaxSize,
|
||||||
|
MaxFiles: testMaxFiles,
|
||||||
|
},
|
||||||
|
osInterface: container.RealOS{},
|
||||||
|
clock: clock.NewFakeClock(now),
|
||||||
|
}
|
||||||
|
testLogs := []string{
|
||||||
|
"test-log-1",
|
||||||
|
"test-log-2",
|
||||||
|
"test-log-3",
|
||||||
|
"test-log-2.00000000-000000.gz",
|
||||||
|
"test-log-2.00000000-000001",
|
||||||
|
"test-log-3.00000000-000000.gz",
|
||||||
|
"test-log-3.00000000-000001",
|
||||||
|
}
|
||||||
|
for i := range testLogs {
|
||||||
|
f, err := os.Create(filepath.Join(dir, testLogs[i]))
|
||||||
|
require.NoError(t, err)
|
||||||
|
f.Close()
|
||||||
|
}
|
||||||
|
testContainers := []*critest.FakeContainer{
|
||||||
|
{
|
||||||
|
ContainerStatus: runtimeapi.ContainerStatus{
|
||||||
|
Id: "container-1",
|
||||||
|
State: runtimeapi.ContainerState_CONTAINER_RUNNING,
|
||||||
|
LogPath: filepath.Join(dir, testLogs[0]),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ContainerStatus: runtimeapi.ContainerStatus{
|
||||||
|
Id: "container-2",
|
||||||
|
State: runtimeapi.ContainerState_CONTAINER_RUNNING,
|
||||||
|
LogPath: filepath.Join(dir, testLogs[1]),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ContainerStatus: runtimeapi.ContainerStatus{
|
||||||
|
Id: "container-3",
|
||||||
|
State: runtimeapi.ContainerState_CONTAINER_EXITED,
|
||||||
|
LogPath: filepath.Join(dir, testLogs[2]),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
f.SetFakeContainers(testContainers)
|
||||||
|
|
||||||
|
err = c.Clean("container-3")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
logs, err := ioutil.ReadDir(dir)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, logs, 4)
|
||||||
|
assert.Equal(t, testLogs[0], logs[0].Name())
|
||||||
|
assert.Equal(t, testLogs[1], logs[1].Name())
|
||||||
|
assert.Equal(t, testLogs[3], logs[2].Name())
|
||||||
|
assert.Equal(t, testLogs[4], logs[3].Name())
|
||||||
|
}
|
||||||
|
|
||||||
func TestCleanupUnusedLog(t *testing.T) {
|
func TestCleanupUnusedLog(t *testing.T) {
|
||||||
dir, err := ioutil.TempDir("", "test-cleanup-unused-log")
|
dir, err := ioutil.TempDir("", "test-cleanup-unused-log")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -178,7 +251,9 @@ func TestCleanupUnusedLog(t *testing.T) {
|
|||||||
f.Close()
|
f.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
c := &containerLogManager{}
|
c := &containerLogManager{
|
||||||
|
osInterface: container.RealOS{},
|
||||||
|
}
|
||||||
got, err := c.cleanupUnusedLogs(testLogs)
|
got, err := c.cleanupUnusedLogs(testLogs)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Len(t, got, 2)
|
assert.Len(t, got, 2)
|
||||||
@ -223,7 +298,10 @@ func TestRemoveExcessLog(t *testing.T) {
|
|||||||
f.Close()
|
f.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
c := &containerLogManager{policy: LogRotatePolicy{MaxFiles: test.max}}
|
c := &containerLogManager{
|
||||||
|
policy: LogRotatePolicy{MaxFiles: test.max},
|
||||||
|
osInterface: container.RealOS{},
|
||||||
|
}
|
||||||
got, err := c.removeExcessLogs(testLogs)
|
got, err := c.removeExcessLogs(testLogs)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, got, len(test.expect))
|
require.Len(t, got, len(test.expect))
|
||||||
@ -253,7 +331,7 @@ func TestCompressLog(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
testLog := testFile.Name()
|
testLog := testFile.Name()
|
||||||
c := &containerLogManager{}
|
c := &containerLogManager{osInterface: container.RealOS{}}
|
||||||
require.NoError(t, c.compressLog(testLog))
|
require.NoError(t, c.compressLog(testLog))
|
||||||
_, err = os.Stat(testLog + compressSuffix)
|
_, err = os.Stat(testLog + compressSuffix)
|
||||||
assert.NoError(t, err, "log should be compressed")
|
assert.NoError(t, err, "log should be compressed")
|
||||||
@ -303,6 +381,7 @@ func TestRotateLatestLog(t *testing.T) {
|
|||||||
c := &containerLogManager{
|
c := &containerLogManager{
|
||||||
runtimeService: f,
|
runtimeService: f,
|
||||||
policy: LogRotatePolicy{MaxFiles: test.maxFiles},
|
policy: LogRotatePolicy{MaxFiles: test.maxFiles},
|
||||||
|
osInterface: container.RealOS{},
|
||||||
clock: clock.NewFakeClock(now),
|
clock: clock.NewFakeClock(now),
|
||||||
}
|
}
|
||||||
if test.runtimeError != nil {
|
if test.runtimeError != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user