From 5d1b3e26af73dde33ecb6a3e69fb5876ceab192f Mon Sep 17 00:00:00 2001 From: Marian Lobur Date: Wed, 22 Jul 2020 12:06:15 +0200 Subject: [PATCH] Fix an issue when rotated logs of dead containers are not removed. --- pkg/kubelet/container/os.go | 18 ++++ pkg/kubelet/container/testing/os.go | 35 +++++++- pkg/kubelet/kubelet.go | 32 +++---- pkg/kubelet/kuberuntime/BUILD | 1 + .../kuberuntime/fake_kuberuntime_manager.go | 6 ++ .../kuberuntime/kuberuntime_container.go | 14 +-- .../kuberuntime/kuberuntime_container_test.go | 17 +++- .../kuberuntime/kuberuntime_manager.go | 6 ++ pkg/kubelet/logs/BUILD | 2 + pkg/kubelet/logs/container_log_manager.go | 57 +++++++++--- .../logs/container_log_manager_stub.go | 4 + .../logs/container_log_manager_test.go | 87 ++++++++++++++++++- 12 files changed, 235 insertions(+), 44 deletions(-) diff --git a/pkg/kubelet/container/os.go b/pkg/kubelet/container/os.go index bd27ae9f079..dae825b957b 100644 --- a/pkg/kubelet/container/os.go +++ b/pkg/kubelet/container/os.go @@ -38,6 +38,9 @@ type OSInterface interface { Pipe() (r *os.File, w *os.File, err error) ReadDir(dirname string) ([]os.FileInfo, error) Glob(pattern string) ([]string, error) + Open(name string) (*os.File, error) + OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) + Rename(oldpath, newpath string) error } // RealOS is used to dispatch the real system level operations. @@ -105,3 +108,18 @@ func (RealOS) ReadDir(dirname string) ([]os.FileInfo, error) { func (RealOS) Glob(pattern string) ([]string, error) { return filepath.Glob(pattern) } + +// Open will call os.Open to return the file. +func (RealOS) Open(name string) (*os.File, error) { + return os.Open(name) +} + +// OpenFile will call os.OpenFile to return the file. +func (RealOS) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) { + return os.OpenFile(name, flag, perm) +} + +// Rename will call os.Rename to rename a file. +func (RealOS) Rename(oldpath, newpath string) error { + return os.Rename(oldpath, newpath) +} diff --git a/pkg/kubelet/container/testing/os.go b/pkg/kubelet/container/testing/os.go index ee8d255b087..16d344ba06d 100644 --- a/pkg/kubelet/container/testing/os.go +++ b/pkg/kubelet/container/testing/os.go @@ -30,6 +30,7 @@ type FakeOS struct { ReadDirFn func(string) ([]os.FileInfo, error) MkdirAllFn func(string, os.FileMode) error SymlinkFn func(string, string) error + GlobFn func(string, string) bool HostName string Removes []string Files map[string][]*os.FileInfo @@ -78,8 +79,12 @@ func (f *FakeOS) RemoveAll(path string) error { return nil } -// Create is a fake call that returns nil. -func (FakeOS) Create(path string) (*os.File, error) { +// Create is a fake call that creates a virtual file and returns nil. +func (f *FakeOS) Create(path string) (*os.File, error) { + if f.Files == nil { + f.Files = make(map[string][]*os.FileInfo) + } + f.Files[path] = []*os.FileInfo{} return nil, nil } @@ -111,7 +116,31 @@ func (f *FakeOS) ReadDir(dirname string) ([]os.FileInfo, error) { return nil, nil } -// Glob is a fake call that returns nil. +// Glob is a fake call that returns list of virtual files matching a pattern. func (f *FakeOS) Glob(pattern string) ([]string, error) { + if f.GlobFn != nil { + var res []string + for k := range f.Files { + if f.GlobFn(pattern, k) { + res = append(res, k) + } + } + return res, nil + } return nil, nil } + +// Open is a fake call that returns nil. +func (FakeOS) Open(name string) (*os.File, error) { + return nil, nil +} + +// OpenFile is a fake call that return nil. +func (FakeOS) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) { + return nil, nil +} + +// Rename is a fake call that return nil. +func (FakeOS) Rename(oldpath, newpath string) error { + return nil +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 051671a9fc8..ac328ca913a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -585,6 +585,22 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient) } + if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) { + // setup containerLogManager for CRI container runtime + containerLogManager, err := logs.NewContainerLogManager( + klet.runtimeService, + kubeDeps.OSInterface, + kubeCfg.ContainerLogMaxSize, + int(kubeCfg.ContainerLogMaxFiles), + ) + if err != nil { + return nil, fmt.Errorf("failed to initialize container log manager: %v", err) + } + klet.containerLogManager = containerLogManager + } else { + klet.containerLogManager = logs.NewStubContainerLogManager() + } + runtime, err := kuberuntime.NewKubeGenericRuntimeManager( kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, @@ -605,6 +621,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps.RemoteImageService, kubeDeps.ContainerManager.InternalContainerLifecycle(), kubeDeps.dockerLegacyService, + klet.containerLogManager, klet.runtimeClassManager, ) if err != nil { @@ -662,21 +679,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, } klet.imageManager = imageManager - if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) { - // setup containerLogManager for CRI container runtime - containerLogManager, err := logs.NewContainerLogManager( - klet.runtimeService, - kubeCfg.ContainerLogMaxSize, - int(kubeCfg.ContainerLogMaxFiles), - ) - if err != nil { - return nil, fmt.Errorf("failed to initialize container log manager: %v", err) - } - klet.containerLogManager = containerLogManager - } else { - klet.containerLogManager = logs.NewStubContainerLogManager() - } - if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) { klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory) if err != nil { diff --git a/pkg/kubelet/kuberuntime/BUILD b/pkg/kubelet/kuberuntime/BUILD index f22fa2f23b9..19306019ceb 100644 --- a/pkg/kubelet/kuberuntime/BUILD +++ b/pkg/kubelet/kuberuntime/BUILD @@ -41,6 +41,7 @@ go_library( "//pkg/kubelet/images:go_default_library", "//pkg/kubelet/kuberuntime/logs:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", + "//pkg/kubelet/logs:go_default_library", "//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/prober/results:go_default_library", "//pkg/kubelet/runtimeclass:go_default_library", diff --git a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go index df9fc028416..a7190df2087 100644 --- a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go @@ -32,6 +32,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/logs" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" ) @@ -73,6 +74,10 @@ func (f *fakePodStateProvider) IsPodTerminated(uid types.UID) bool { func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring) (*kubeGenericRuntimeManager, error) { recorder := &record.FakeRecorder{} + logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2) + if err != nil { + return nil, err + } kubeRuntimeManager := &kubeGenericRuntimeManager{ recorder: recorder, cpuCFSQuota: false, @@ -88,6 +93,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS seccompProfileRoot: fakeSeccompProfileRoot, internalLifecycle: cm.NewFakeInternalContainerLifecycle(), logReduction: logreduction.NewLogReduction(identicalErrorDelay), + logManager: logManager, } typedVersion, err := runtimeService.Version(kubeRuntimeAPIVersion) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index ec8d9fac03f..af361122c35 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -855,19 +855,19 @@ func (m *kubeGenericRuntimeManager) removeContainer(containerID string) error { // removeContainerLog removes the container log. func (m *kubeGenericRuntimeManager) removeContainerLog(containerID string) error { - // Remove the container log. + // Use log manager to remove rotated logs. + err := m.logManager.Clean(containerID) + if err != nil { + return err + } + status, err := m.runtimeService.ContainerStatus(containerID) if err != nil { return fmt.Errorf("failed to get container status %q: %v", containerID, err) } - labeledInfo := getContainerInfoFromLabels(status.Labels) - path := status.GetLogPath() - if err := m.osInterface.Remove(path); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to remove container %q log %q: %v", containerID, path, err) - } - // Remove the legacy container log symlink. // TODO(random-liu): Remove this after cluster logging supports CRI container log path. + labeledInfo := getContainerInfoFromLabels(status.Labels) legacySymlink := legacyLogSymlink(containerID, labeledInfo.ContainerName, labeledInfo.PodName, labeledInfo.PodNamespace) if err := m.osInterface.Remove(legacySymlink); err != nil && !os.IsNotExist(err) { diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container_test.go b/pkg/kubelet/kuberuntime/kuberuntime_container_test.go index 7a4b6a64403..100240354f3 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container_test.go @@ -19,6 +19,7 @@ package kuberuntime import ( "fmt" "path/filepath" + "regexp" "strings" "testing" "time" @@ -65,12 +66,22 @@ func TestRemoveContainer(t *testing.T) { containerID := fakeContainers[0].Id fakeOS := m.osInterface.(*containertest.FakeOS) + fakeOS.GlobFn = func(pattern, path string) bool { + pattern = strings.Replace(pattern, "*", ".*", -1) + return regexp.MustCompile(pattern).MatchString(path) + } + expectedContainerLogPath := filepath.Join(podLogsRootDirectory, "new_bar_12345678", "foo", "0.log") + expectedContainerLogPathRotated := filepath.Join(podLogsRootDirectory, "new_bar_12345678", "foo", "0.log.20060102-150405") + expectedContainerLogSymlink := legacyLogSymlink(containerID, "foo", "bar", "new") + + fakeOS.Create(expectedContainerLogPath) + fakeOS.Create(expectedContainerLogPathRotated) + err = m.removeContainer(containerID) assert.NoError(t, err) // Verify container log is removed - expectedContainerLogPath := filepath.Join(podLogsRootDirectory, "new_bar_12345678", "foo", "0.log") - expectedContainerLogSymlink := legacyLogSymlink(containerID, "foo", "bar", "new") - assert.Equal(t, fakeOS.Removes, []string{expectedContainerLogPath, expectedContainerLogSymlink}) + + assert.Equal(t, []string{expectedContainerLogPath, expectedContainerLogPathRotated, expectedContainerLogSymlink}, fakeOS.Removes) // Verify container is removed assert.Contains(t, fakeRuntime.Called, "RemoveContainer") containers, err := fakeRuntime.ListContainers(&runtimeapi.ContainerFilter{Id: containerID}) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index eebba49bbbe..67d121d7321 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/logs" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/runtimeclass" "k8s.io/kubernetes/pkg/kubelet/types" @@ -127,6 +128,9 @@ type kubeGenericRuntimeManager struct { // A shim to legacy functions for backward compatibility. legacyLogProvider LegacyLogProvider + // Manage container logs. + logManager logs.ContainerLogManager + // Manage RuntimeClass resources. runtimeClassManager *runtimeclass.Manager @@ -168,6 +172,7 @@ func NewKubeGenericRuntimeManager( imageService internalapi.ImageManagerService, internalLifecycle cm.InternalContainerLifecycle, legacyLogProvider LegacyLogProvider, + logManager logs.ContainerLogManager, runtimeClassManager *runtimeclass.Manager, ) (KubeGenericRuntime, error) { kubeRuntimeManager := &kubeGenericRuntimeManager{ @@ -185,6 +190,7 @@ func NewKubeGenericRuntimeManager( keyring: credentialprovider.NewDockerKeyring(), internalLifecycle: internalLifecycle, legacyLogProvider: legacyLogProvider, + logManager: logManager, runtimeClassManager: runtimeClassManager, logReduction: logreduction.NewLogReduction(identicalErrorDelay), } diff --git a/pkg/kubelet/logs/BUILD b/pkg/kubelet/logs/BUILD index 7ef5310df09..0b7a608c050 100644 --- a/pkg/kubelet/logs/BUILD +++ b/pkg/kubelet/logs/BUILD @@ -9,6 +9,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/kubelet/logs", visibility = ["//visibility:public"], deps = [ + "//pkg/kubelet/container:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", @@ -23,6 +24,7 @@ go_test( srcs = ["container_log_manager_test.go"], embed = [":go_default_library"], deps = [ + "//pkg/kubelet/container:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library", "//staging/src/k8s.io/cri-api/pkg/apis/testing:go_default_library", diff --git a/pkg/kubelet/logs/container_log_manager.go b/pkg/kubelet/logs/container_log_manager.go index 502f3b622cc..56e4f31169d 100644 --- a/pkg/kubelet/logs/container_log_manager.go +++ b/pkg/kubelet/logs/container_log_manager.go @@ -24,6 +24,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "k8s.io/klog/v2" @@ -33,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" internalapi "k8s.io/cri-api/pkg/apis" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) const ( @@ -55,6 +57,8 @@ type ContainerLogManager interface { // TODO(random-liu): Add RotateLogs function and call it under disk pressure. // Start container log manager. Start() + // Clean removes all logs of specified container. + Clean(containerID string) error } // LogRotatePolicy is a policy for container log rotation. The policy applies to all @@ -142,12 +146,14 @@ func parseMaxSize(size string) (int64, error) { type containerLogManager struct { runtimeService internalapi.RuntimeService + osInterface kubecontainer.OSInterface policy LogRotatePolicy clock clock.Clock + mutex sync.Mutex } // NewContainerLogManager creates a new container log manager. -func NewContainerLogManager(runtimeService internalapi.RuntimeService, maxSize string, maxFiles int) (ContainerLogManager, error) { +func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterface kubecontainer.OSInterface, maxSize string, maxFiles int) (ContainerLogManager, error) { if maxFiles <= 1 { return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles) } @@ -157,12 +163,14 @@ func NewContainerLogManager(runtimeService internalapi.RuntimeService, maxSize s } // policy LogRotatePolicy return &containerLogManager{ + osInterface: osInterface, runtimeService: runtimeService, policy: LogRotatePolicy{ MaxSize: parsedMaxSize, MaxFiles: maxFiles, }, clock: clock.RealClock{}, + mutex: sync.Mutex{}, }, nil } @@ -176,7 +184,32 @@ func (c *containerLogManager) Start() { }, logMonitorPeriod) } +// Clean removes all logs of specified container (including rotated one). +func (c *containerLogManager) Clean(containerID string) error { + c.mutex.Lock() + defer c.mutex.Unlock() + status, err := c.runtimeService.ContainerStatus(containerID) + if err != nil { + return fmt.Errorf("failed to get container status %q: %v", containerID, err) + } + pattern := fmt.Sprintf("%s*", status.GetLogPath()) + logs, err := c.osInterface.Glob(pattern) + if err != nil { + return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err) + } + + for _, l := range logs { + if err := c.osInterface.Remove(l); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove container %q log %q: %v", containerID, l, err) + } + } + + return nil +} + func (c *containerLogManager) rotateLogs() error { + c.mutex.Lock() + defer c.mutex.Unlock() // TODO(#59998): Use kubelet pod cache. containers, err := c.runtimeService.ListContainers(&runtimeapi.ContainerFilter{}) if err != nil { @@ -197,7 +230,7 @@ func (c *containerLogManager) rotateLogs() error { continue } path := status.GetLogPath() - info, err := os.Stat(path) + info, err := c.osInterface.Stat(path) if err != nil { if !os.IsNotExist(err) { klog.Errorf("Failed to stat container log %q: %v", path, err) @@ -211,7 +244,7 @@ func (c *containerLogManager) rotateLogs() error { continue } // The container log should be recovered. - info, err = os.Stat(path) + info, err = c.osInterface.Stat(path) if err != nil { klog.Errorf("Failed to stat container log %q after reopen: %v", path, err) continue @@ -269,7 +302,7 @@ func (c *containerLogManager) rotateLog(id, log string) error { func (c *containerLogManager) cleanupUnusedLogs(logs []string) ([]string, error) { inuse, unused := filterUnusedLogs(logs) for _, l := range unused { - if err := os.Remove(l); err != nil { + if err := c.osInterface.Remove(l); err != nil { return nil, fmt.Errorf("failed to remove unused log %q: %v", l, err) } } @@ -322,7 +355,7 @@ func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error) } i := 0 for ; i < len(logs)-maxRotatedFiles; i++ { - if err := os.Remove(logs[i]); err != nil { + if err := c.osInterface.Remove(logs[i]); err != nil { return nil, fmt.Errorf("failed to remove old log %q: %v", logs[i], err) } } @@ -332,19 +365,19 @@ func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error) // compressLog compresses a log to log.gz with gzip. func (c *containerLogManager) compressLog(log string) error { - r, err := os.Open(log) + r, err := c.osInterface.Open(log) if err != nil { return fmt.Errorf("failed to open log %q: %v", log, err) } defer r.Close() tmpLog := log + tmpSuffix - f, err := os.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + f, err := c.osInterface.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return fmt.Errorf("failed to create temporary log %q: %v", tmpLog, err) } defer func() { // Best effort cleanup of tmpLog. - os.Remove(tmpLog) + c.osInterface.Remove(tmpLog) }() defer f.Close() w := gzip.NewWriter(f) @@ -353,11 +386,11 @@ func (c *containerLogManager) compressLog(log string) error { return fmt.Errorf("failed to compress %q to %q: %v", log, tmpLog, err) } compressedLog := log + compressSuffix - if err := os.Rename(tmpLog, compressedLog); err != nil { + if err := c.osInterface.Rename(tmpLog, compressedLog); err != nil { return fmt.Errorf("failed to rename %q to %q: %v", tmpLog, compressedLog, err) } // Remove old log file. - if err := os.Remove(log); err != nil { + if err := c.osInterface.Remove(log); err != nil { return fmt.Errorf("failed to remove log %q after compress: %v", log, err) } return nil @@ -368,14 +401,14 @@ func (c *containerLogManager) compressLog(log string) error { func (c *containerLogManager) rotateLatestLog(id, log string) error { timestamp := c.clock.Now().Format(timestampFormat) rotated := fmt.Sprintf("%s.%s", log, timestamp) - if err := os.Rename(log, rotated); err != nil { + if err := c.osInterface.Rename(log, rotated); err != nil { return fmt.Errorf("failed to rotate log %q to %q: %v", log, rotated, err) } if err := c.runtimeService.ReopenContainerLog(id); err != nil { // Rename the rotated log back, so that we can try rotating it again // next round. // If kubelet gets restarted at this point, we'll lose original log. - if renameErr := os.Rename(rotated, log); renameErr != nil { + if renameErr := c.osInterface.Rename(rotated, log); renameErr != nil { // This shouldn't happen. // Report an error if this happens, because we will lose original // log. diff --git a/pkg/kubelet/logs/container_log_manager_stub.go b/pkg/kubelet/logs/container_log_manager_stub.go index a6e0729f19d..27db1e42cbf 100644 --- a/pkg/kubelet/logs/container_log_manager_stub.go +++ b/pkg/kubelet/logs/container_log_manager_stub.go @@ -20,6 +20,10 @@ type containerLogManagerStub struct{} func (*containerLogManagerStub) Start() {} +func (*containerLogManagerStub) Clean(containerID string) error { + return nil +} + // NewStubContainerLogManager returns an empty ContainerLogManager which does nothing. func NewStubContainerLogManager() ContainerLogManager { return &containerLogManagerStub{} diff --git a/pkg/kubelet/logs/container_log_manager_test.go b/pkg/kubelet/logs/container_log_manager_test.go index c2bbe6585dd..c4a7fc44593 100644 --- a/pkg/kubelet/logs/container_log_manager_test.go +++ b/pkg/kubelet/logs/container_log_manager_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/apimachinery/pkg/util/clock" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" @@ -90,7 +91,8 @@ func TestRotateLogs(t *testing.T) { MaxSize: testMaxSize, MaxFiles: testMaxFiles, }, - clock: clock.NewFakeClock(now), + osInterface: container.RealOS{}, + clock: clock.NewFakeClock(now), } testLogs := []string{ "test-log-1", @@ -159,6 +161,77 @@ func TestRotateLogs(t *testing.T) { assert.Equal(t, testLogs[3], logs[4].Name()) } +func TestClean(t *testing.T) { + dir, err := ioutil.TempDir("", "test-clean") + require.NoError(t, err) + defer os.RemoveAll(dir) + + const ( + testMaxFiles = 3 + testMaxSize = 10 + ) + now := time.Now() + f := critest.NewFakeRuntimeService() + c := &containerLogManager{ + runtimeService: f, + policy: LogRotatePolicy{ + MaxSize: testMaxSize, + MaxFiles: testMaxFiles, + }, + osInterface: container.RealOS{}, + clock: clock.NewFakeClock(now), + } + testLogs := []string{ + "test-log-1", + "test-log-2", + "test-log-3", + "test-log-2.00000000-000000.gz", + "test-log-2.00000000-000001", + "test-log-3.00000000-000000.gz", + "test-log-3.00000000-000001", + } + for i := range testLogs { + f, err := os.Create(filepath.Join(dir, testLogs[i])) + require.NoError(t, err) + f.Close() + } + testContainers := []*critest.FakeContainer{ + { + ContainerStatus: runtimeapi.ContainerStatus{ + Id: "container-1", + State: runtimeapi.ContainerState_CONTAINER_RUNNING, + LogPath: filepath.Join(dir, testLogs[0]), + }, + }, + { + ContainerStatus: runtimeapi.ContainerStatus{ + Id: "container-2", + State: runtimeapi.ContainerState_CONTAINER_RUNNING, + LogPath: filepath.Join(dir, testLogs[1]), + }, + }, + { + ContainerStatus: runtimeapi.ContainerStatus{ + Id: "container-3", + State: runtimeapi.ContainerState_CONTAINER_EXITED, + LogPath: filepath.Join(dir, testLogs[2]), + }, + }, + } + f.SetFakeContainers(testContainers) + + err = c.Clean("container-3") + require.NoError(t, err) + + logs, err := ioutil.ReadDir(dir) + require.NoError(t, err) + assert.Len(t, logs, 4) + assert.Equal(t, testLogs[0], logs[0].Name()) + assert.Equal(t, testLogs[1], logs[1].Name()) + assert.Equal(t, testLogs[3], logs[2].Name()) + assert.Equal(t, testLogs[4], logs[3].Name()) +} + func TestCleanupUnusedLog(t *testing.T) { dir, err := ioutil.TempDir("", "test-cleanup-unused-log") require.NoError(t, err) @@ -178,7 +251,9 @@ func TestCleanupUnusedLog(t *testing.T) { f.Close() } - c := &containerLogManager{} + c := &containerLogManager{ + osInterface: container.RealOS{}, + } got, err := c.cleanupUnusedLogs(testLogs) require.NoError(t, err) assert.Len(t, got, 2) @@ -223,7 +298,10 @@ func TestRemoveExcessLog(t *testing.T) { f.Close() } - c := &containerLogManager{policy: LogRotatePolicy{MaxFiles: test.max}} + c := &containerLogManager{ + policy: LogRotatePolicy{MaxFiles: test.max}, + osInterface: container.RealOS{}, + } got, err := c.removeExcessLogs(testLogs) require.NoError(t, err) require.Len(t, got, len(test.expect)) @@ -253,7 +331,7 @@ func TestCompressLog(t *testing.T) { require.NoError(t, err) testLog := testFile.Name() - c := &containerLogManager{} + c := &containerLogManager{osInterface: container.RealOS{}} require.NoError(t, c.compressLog(testLog)) _, err = os.Stat(testLog + compressSuffix) assert.NoError(t, err, "log should be compressed") @@ -303,6 +381,7 @@ func TestRotateLatestLog(t *testing.T) { c := &containerLogManager{ runtimeService: f, policy: LogRotatePolicy{MaxFiles: test.maxFiles}, + osInterface: container.RealOS{}, clock: clock.NewFakeClock(now), } if test.runtimeError != nil {