From 022f7c2cd75f984ef0a3a3a7a826ef723a65b555 Mon Sep 17 00:00:00 2001 From: Robert Krawitz Date: Mon, 3 Dec 2018 18:27:53 -0500 Subject: [PATCH] Clean up PR #71617 --- pkg/kubelet/kuberuntime/BUILD | 1 + .../kuberuntime/fake_kuberuntime_manager.go | 2 + .../kuberuntime/kuberuntime_manager.go | 32 ++------ pkg/kubelet/remote/BUILD | 1 + pkg/kubelet/remote/remote_runtime.go | 54 +++---------- pkg/kubelet/util/BUILD | 1 + pkg/kubelet/util/logreduction/BUILD | 28 +++++++ pkg/kubelet/util/logreduction/logreduction.go | 78 +++++++++++++++++++ .../util/logreduction/logreduction_test.go | 70 +++++++++++++++++ 9 files changed, 195 insertions(+), 72 deletions(-) create mode 100644 pkg/kubelet/util/logreduction/BUILD create mode 100644 pkg/kubelet/util/logreduction/logreduction.go create mode 100644 pkg/kubelet/util/logreduction/logreduction_test.go diff --git a/pkg/kubelet/kuberuntime/BUILD b/pkg/kubelet/kuberuntime/BUILD index e8c5fd3a5ef..5c5e5783b76 100644 --- a/pkg/kubelet/kuberuntime/BUILD +++ b/pkg/kubelet/kuberuntime/BUILD @@ -47,6 +47,7 @@ go_library( "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/cache:go_default_library", "//pkg/kubelet/util/format:go_default_library", + "//pkg/kubelet/util/logreduction:go_default_library", "//pkg/security/apparmor:go_default_library", "//pkg/securitycontext:go_default_library", "//pkg/util/parsers:go_default_library", diff --git a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go index 61a4e411282..2f2a64f9d45 100644 --- a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/lifecycle" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" + "k8s.io/kubernetes/pkg/kubelet/util/logreduction" ) const ( @@ -86,6 +87,7 @@ func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS keyring: keyring, seccompProfileRoot: fakeSeccompProfileRoot, internalLifecycle: cm.NewFakeInternalContainerLifecycle(), + logReduction: logreduction.NewLogReduction(identicalErrorDelay), } typedVersion, err := runtimeService.Version(kubeRuntimeAPIVersion) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 115347b77e1..6d84aac3b19 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "os" - "sync" "time" cadvisorapi "github.com/google/cadvisor/info/v1" @@ -48,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/cache" "k8s.io/kubernetes/pkg/kubelet/util/format" + "k8s.io/kubernetes/pkg/kubelet/util/logreduction" ) const ( @@ -128,11 +128,7 @@ type kubeGenericRuntimeManager struct { runtimeClassManager *runtimeclass.Manager // Cache last per-container error message to reduce log spam - lastError map[string]string - - // Time last per-container error message was printed - errorPrinted map[string]time.Time - errorMapLock sync.Mutex + logReduction *logreduction.LogReduction } // KubeGenericRuntime is a interface contains interfaces for container runtime and command. @@ -187,8 +183,7 @@ func NewKubeGenericRuntimeManager( internalLifecycle: internalLifecycle, legacyLogProvider: legacyLogProvider, runtimeClassManager: runtimeClassManager, - lastError: make(map[string]string), - errorPrinted: make(map[string]time.Time), + logReduction: logreduction.NewLogReduction(identicalErrorDelay), } typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion) @@ -850,17 +845,6 @@ func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPo return } -func (m *kubeGenericRuntimeManager) cleanupErrorTimeouts() { - m.errorMapLock.Lock() - defer m.errorMapLock.Unlock() - for name, timeout := range m.errorPrinted { - if time.Since(timeout) >= identicalErrorDelay { - delete(m.errorPrinted, name) - delete(m.lastError, name) - } - } -} - // GetPodStatus retrieves the status of the pod, including the // information of all containers in the pod that are visible in Runtime. func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kubecontainer.PodStatus, error) { @@ -909,19 +893,13 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp // Get statuses of all containers visible in the pod. containerStatuses, err := m.getPodContainerStatuses(uid, name, namespace) - m.errorMapLock.Lock() - defer m.errorMapLock.Unlock() if err != nil { - lastMsg, ok := m.lastError[podFullName] - if !ok || err.Error() != lastMsg || time.Since(m.errorPrinted[podFullName]) >= identicalErrorDelay { + if m.logReduction.ShouldMessageBePrinted(err.Error(), podFullName) { klog.Errorf("getPodContainerStatuses for pod %q failed: %v", podFullName, err) - m.errorPrinted[podFullName] = time.Now() - m.lastError[podFullName] = err.Error() } return nil, err } - delete(m.errorPrinted, podFullName) - delete(m.lastError, podFullName) + m.logReduction.ClearID(podFullName) return &kubecontainer.PodStatus{ ID: uid, diff --git a/pkg/kubelet/remote/BUILD b/pkg/kubelet/remote/BUILD index d407dfdfd9e..b607db4aeff 100644 --- a/pkg/kubelet/remote/BUILD +++ b/pkg/kubelet/remote/BUILD @@ -17,6 +17,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/kubelet/remote", deps = [ "//pkg/kubelet/util:go_default_library", + "//pkg/kubelet/util/logreduction:go_default_library", "//staging/src/k8s.io/cri-api/pkg/apis:go_default_library", "//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", diff --git a/pkg/kubelet/remote/remote_runtime.go b/pkg/kubelet/remote/remote_runtime.go index 78119b91e48..cf09dad4c12 100644 --- a/pkg/kubelet/remote/remote_runtime.go +++ b/pkg/kubelet/remote/remote_runtime.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "strings" - "sync" "time" "google.golang.org/grpc" @@ -30,6 +29,7 @@ import ( internalapi "k8s.io/cri-api/pkg/apis" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" "k8s.io/kubernetes/pkg/kubelet/util" + "k8s.io/kubernetes/pkg/kubelet/util/logreduction" utilexec "k8s.io/utils/exec" ) @@ -38,10 +38,7 @@ type RemoteRuntimeService struct { timeout time.Duration runtimeClient runtimeapi.RuntimeServiceClient // Cache last per-container error message to reduce log spam - lastError map[string]string - // Time last per-container error message was printed - errorPrinted map[string]time.Time - errorMapLock sync.Mutex + logReduction *logreduction.LogReduction } const ( @@ -68,8 +65,7 @@ func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) ( return &RemoteRuntimeService{ timeout: connectionTimeout, runtimeClient: runtimeapi.NewRuntimeServiceClient(conn), - lastError: make(map[string]string), - errorPrinted: make(map[string]time.Time), + logReduction: logreduction.NewLogReduction(identicalErrorDelay), }, nil } @@ -238,10 +234,7 @@ func (r *RemoteRuntimeService) StopContainer(containerID string, timeout int64) ctx, cancel := getContextWithTimeout(t) defer cancel() - r.errorMapLock.Lock() - delete(r.lastError, containerID) - delete(r.errorPrinted, containerID) - r.errorMapLock.Unlock() + r.logReduction.ClearID(containerID) _, err := r.runtimeClient.StopContainer(ctx, &runtimeapi.StopContainerRequest{ ContainerId: containerID, Timeout: timeout, @@ -260,10 +253,7 @@ func (r *RemoteRuntimeService) RemoveContainer(containerID string) error { ctx, cancel := getContextWithTimeout(r.timeout) defer cancel() - r.errorMapLock.Lock() - delete(r.lastError, containerID) - delete(r.errorPrinted, containerID) - r.errorMapLock.Unlock() + r.logReduction.ClearID(containerID) _, err := r.runtimeClient.RemoveContainer(ctx, &runtimeapi.RemoveContainerRequest{ ContainerId: containerID, }) @@ -291,18 +281,6 @@ func (r *RemoteRuntimeService) ListContainers(filter *runtimeapi.ContainerFilter return resp.Containers, nil } -// Clean up any expired last-error timers -func (r *RemoteRuntimeService) cleanupErrorTimeouts() { - r.errorMapLock.Lock() - defer r.errorMapLock.Unlock() - for ID, timeout := range r.errorPrinted { - if time.Since(timeout) >= identicalErrorDelay { - delete(r.lastError, ID) - delete(r.errorPrinted, ID) - } - } -} - // ContainerStatus returns the container status. func (r *RemoteRuntimeService) ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error) { ctx, cancel := getContextWithTimeout(r.timeout) @@ -311,21 +289,14 @@ func (r *RemoteRuntimeService) ContainerStatus(containerID string) (*runtimeapi. resp, err := r.runtimeClient.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{ ContainerId: containerID, }) - r.cleanupErrorTimeouts() - r.errorMapLock.Lock() - defer r.errorMapLock.Unlock() if err != nil { // Don't spam the log with endless messages about the same failure. - lastMsg, ok := r.lastError[containerID] - if !ok || err.Error() != lastMsg || time.Since(r.errorPrinted[containerID]) >= identicalErrorDelay { + if r.logReduction.ShouldMessageBePrinted(err.Error(), containerID) { klog.Errorf("ContainerStatus %q from runtime service failed: %v", containerID, err) - r.errorPrinted[containerID] = time.Now() - r.lastError[containerID] = err.Error() } return nil, err } - delete(r.lastError, containerID) - delete(r.errorPrinted, containerID) + r.logReduction.ClearID(containerID) if resp.Status != nil { if err := verifyContainerStatus(resp.Status); err != nil { @@ -500,20 +471,13 @@ func (r *RemoteRuntimeService) ContainerStats(containerID string) (*runtimeapi.C resp, err := r.runtimeClient.ContainerStats(ctx, &runtimeapi.ContainerStatsRequest{ ContainerId: containerID, }) - r.cleanupErrorTimeouts() - r.errorMapLock.Lock() - defer r.errorMapLock.Unlock() if err != nil { - lastMsg, ok := r.lastError[containerID] - if !ok || err.Error() != lastMsg || time.Since(r.errorPrinted[containerID]) >= identicalErrorDelay { + if r.logReduction.ShouldMessageBePrinted(err.Error(), containerID) { klog.Errorf("ContainerStatus %q from runtime service failed: %v", containerID, err) - r.errorPrinted[containerID] = time.Now() - r.lastError[containerID] = err.Error() } return nil, err } - delete(r.lastError, containerID) - delete(r.errorPrinted, containerID) + r.logReduction.ClearID(containerID) return resp.GetStats(), nil } diff --git a/pkg/kubelet/util/BUILD b/pkg/kubelet/util/BUILD index 7b590024861..d259accde7c 100644 --- a/pkg/kubelet/util/BUILD +++ b/pkg/kubelet/util/BUILD @@ -79,6 +79,7 @@ filegroup( "//pkg/kubelet/util/cache:all-srcs", "//pkg/kubelet/util/format:all-srcs", "//pkg/kubelet/util/ioutils:all-srcs", + "//pkg/kubelet/util/logreduction:all-srcs", "//pkg/kubelet/util/manager:all-srcs", "//pkg/kubelet/util/pluginwatcher:all-srcs", "//pkg/kubelet/util/queue:all-srcs", diff --git a/pkg/kubelet/util/logreduction/BUILD b/pkg/kubelet/util/logreduction/BUILD new file mode 100644 index 00000000000..6b369981d2a --- /dev/null +++ b/pkg/kubelet/util/logreduction/BUILD @@ -0,0 +1,28 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["logreduction.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/util/logreduction", + visibility = ["//visibility:public"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["logreduction_test.go"], + embed = [":go_default_library"], +) diff --git a/pkg/kubelet/util/logreduction/logreduction.go b/pkg/kubelet/util/logreduction/logreduction.go new file mode 100644 index 00000000000..6534a5a64b8 --- /dev/null +++ b/pkg/kubelet/util/logreduction/logreduction.go @@ -0,0 +1,78 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logreduction + +import ( + "sync" + "time" +) + +var nowfunc = func() time.Time { return time.Now() } + +// LogReduction provides a filter for consecutive identical log messages; +// a message will be printed no more than once per interval. +// If a string of messages is interrupted by a different message, +// the interval timer will be reset. +type LogReduction struct { + lastError map[string]string + errorPrinted map[string]time.Time + errorMapLock sync.Mutex + identicalErrorDelay time.Duration +} + +// NewLogReduction returns an initialized LogReduction +func NewLogReduction(identicalErrorDelay time.Duration) *LogReduction { + l := new(LogReduction) + l.lastError = make(map[string]string) + l.errorPrinted = make(map[string]time.Time) + l.identicalErrorDelay = identicalErrorDelay + return l +} + +func (l *LogReduction) cleanupErrorTimeouts() { + for name, timeout := range l.errorPrinted { + if nowfunc().Sub(timeout) >= l.identicalErrorDelay { + delete(l.errorPrinted, name) + delete(l.lastError, name) + } + } +} + +// ShouldMessageBePrinted determines whether a message should be printed based +// on how long ago this particular message was last printed +func (l *LogReduction) ShouldMessageBePrinted(message string, parentID string) bool { + l.errorMapLock.Lock() + defer l.errorMapLock.Unlock() + l.cleanupErrorTimeouts() + lastMsg, ok := l.lastError[parentID] + lastPrinted, ok1 := l.errorPrinted[parentID] + if !ok || !ok1 || message != lastMsg || nowfunc().Sub(lastPrinted) >= l.identicalErrorDelay { + l.errorPrinted[parentID] = nowfunc() + l.lastError[parentID] = message + return true + } + return false +} + +// ClearID clears out log reduction records pertaining to a particular parent +// (e. g. container ID) +func (l *LogReduction) ClearID(parentID string) { + l.errorMapLock.Lock() + defer l.errorMapLock.Unlock() + delete(l.lastError, parentID) + delete(l.errorPrinted, parentID) +} diff --git a/pkg/kubelet/util/logreduction/logreduction_test.go b/pkg/kubelet/util/logreduction/logreduction_test.go new file mode 100644 index 00000000000..08ae1de70ad --- /dev/null +++ b/pkg/kubelet/util/logreduction/logreduction_test.go @@ -0,0 +1,70 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logreduction + +import ( + "testing" + "time" +) + +var time0 = time.Unix(1000, 0) +var time1 = time.Unix(1001, 0) +var time2 = time.Unix(1012, 0) +var identicalErrorDelay = 10 * time.Second +var testCount = 0 + +const ( + mesg1 = "This is a message" + mesg2 = "This is not a message" + id1 = "Container1" + id2 = "Container2" +) + +func checkThat(t *testing.T, r *LogReduction, m, id string) { + testCount++ + if !r.ShouldMessageBePrinted(m, id) { + t.Errorf("Case %d failed (%s/%s should be printed)", testCount, m, id) + } +} + +func checkThatNot(t *testing.T, r *LogReduction, m, id string) { + testCount++ + if r.ShouldMessageBePrinted(m, id) { + t.Errorf("Case %d failed (%s/%s should not be printed)", testCount, m, id) + } +} + +func TestLogReduction(t *testing.T) { + var timeToReturn = time0 + nowfunc = func() time.Time { return timeToReturn } + r := NewLogReduction(identicalErrorDelay) + checkThat(t, r, mesg1, id1) // 1 + checkThatNot(t, r, mesg1, id1) // 2 + checkThat(t, r, mesg1, id2) // 3 + checkThatNot(t, r, mesg1, id1) // 4 + timeToReturn = time1 + checkThatNot(t, r, mesg1, id1) // 5 + timeToReturn = time2 + checkThat(t, r, mesg1, id1) // 6 + checkThatNot(t, r, mesg1, id1) // 7 + checkThat(t, r, mesg2, id1) // 8 + checkThat(t, r, mesg1, id1) // 9 + checkThat(t, r, mesg1, id2) // 10 + r.ClearID(id1) + checkThat(t, r, mesg1, id1) // 11 + checkThatNot(t, r, mesg1, id2) // 12 +}