From 2bb2604f0b0d55bcc624030fbea6dd4d139c96a2 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 7 Dec 2016 15:56:06 -0500 Subject: [PATCH] Limit the size of the termination log and allow log input Enforce the following limits: 12kb for total message length in container status 4kb for the termination message path file 2kb or 80 lines (whichever is shorter) from the log on error Fallback to log output if the user requests it. --- hack/.linted_packages | 1 + pkg/kubelet/container/runtime.go | 16 +++ pkg/kubelet/dockertools/BUILD | 2 + pkg/kubelet/dockertools/docker_manager.go | 62 +++++++++--- pkg/kubelet/dockertools/labels.go | 16 +-- pkg/kubelet/kuberuntime/BUILD | 2 + .../kuberuntime/kuberuntime_container.go | 63 +++++++----- pkg/kubelet/kuberuntime/kuberuntime_logs.go | 40 +------- .../kuberuntime/kuberuntime_logs_test.go | 26 ----- pkg/kubelet/kuberuntime/labels.go | 16 +-- pkg/kubelet/status/status_manager.go | 7 ++ pkg/kubelet/status/status_manager_test.go | 44 ++++++++- pkg/util/tail/BUILD | 19 +++- pkg/util/tail/tail.go | 99 +++++++++++++++++++ pkg/util/tail/tail_test.go | 52 ++++++++++ 15 files changed, 347 insertions(+), 118 deletions(-) create mode 100644 pkg/util/tail/tail.go create mode 100644 pkg/util/tail/tail_test.go diff --git a/hack/.linted_packages b/hack/.linted_packages index cecfd019d3c..a1d4546aaa8 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -289,6 +289,7 @@ pkg/util/replicaset pkg/util/restoptions pkg/util/runtime pkg/util/sets +pkg/util/tail pkg/util/validation pkg/util/validation/field pkg/util/version diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 3a3650dbbea..dab4f4ec21e 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -626,3 +626,19 @@ func (s SortContainerStatusesByCreationTime) Swap(i, j int) { s[i], s[j] = s[j], func (s SortContainerStatusesByCreationTime) Less(i, j int) bool { return s[i].CreatedAt.Before(s[j].CreatedAt) } + +const ( + // MaxPodTerminationMessageLogLength is the maximum bytes any one pod may have written + // as termination message output across all containers. Containers will be evenly truncated + // until output is below this limit. + MaxPodTerminationMessageLogLength = 1024 * 12 + // MaxContainerTerminationMessageLength is the upper bound any one container may write to + // its termination message path. Contents above this length will be truncated. + MaxContainerTerminationMessageLength = 1024 * 4 + // MaxContainerTerminationMessageLogLength is the maximum bytes any one container will + // have written to its termination message when the message is read from the logs. + MaxContainerTerminationMessageLogLength = 1024 * 2 + // MaxContainerTerminationMessageLogLines is the maximum number of previous lines of + // log output that the termination message can contain. + MaxContainerTerminationMessageLogLines = 80 +) diff --git a/pkg/kubelet/dockertools/BUILD b/pkg/kubelet/dockertools/BUILD index d574b112434..25bf8539af0 100644 --- a/pkg/kubelet/dockertools/BUILD +++ b/pkg/kubelet/dockertools/BUILD @@ -52,8 +52,10 @@ go_library( "//pkg/util/procfs:go_default_library", "//pkg/util/selinux:go_default_library", "//pkg/util/strings:go_default_library", + "//pkg/util/tail:go_default_library", "//pkg/util/term:go_default_library", "//pkg/util/version:go_default_library", + "//vendor:github.com/armon/circbuf", "//vendor:github.com/docker/distribution/digest", "//vendor:github.com/docker/distribution/reference", "//vendor:github.com/docker/docker/pkg/jsonmessage", diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 653a6fed4cb..d972dea1990 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -33,6 +33,7 @@ import ( "sync" "time" + "github.com/armon/circbuf" dockertypes "github.com/docker/engine-api/types" dockercontainer "github.com/docker/engine-api/types/container" dockerstrslice "github.com/docker/engine-api/types/strslice" @@ -70,6 +71,7 @@ import ( "k8s.io/kubernetes/pkg/util/procfs" "k8s.io/kubernetes/pkg/util/selinux" utilstrings "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/util/tail" "k8s.io/kubernetes/pkg/util/term" utilversion "k8s.io/kubernetes/pkg/util/version" ) @@ -482,19 +484,12 @@ func (dm *DockerManager) inspectContainer(id string, podName, podNamespace strin startedAt = createdAt } - terminationMessagePath := containerInfo.TerminationMessagePath - if terminationMessagePath != "" { - for _, mount := range iResult.Mounts { - if mount.Destination == terminationMessagePath { - path := mount.Source - if data, err := ioutil.ReadFile(path); err != nil { - message = fmt.Sprintf("Error on reading termination-log %s: %v", path, err) - } else { - message = string(data) - } - } - } + // retrieve the termination message from logs, file, or file with fallback to logs in case of failure + fallbackToLogs := containerInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError && (iResult.State.ExitCode != 0 || iResult.State.OOMKilled) + if msg := getTerminationMessage(dm.c, iResult, containerInfo.TerminationMessagePath, fallbackToLogs); len(msg) > 0 { + message = msg } + status.State = kubecontainer.ContainerStateExited status.Message = message status.Reason = reason @@ -508,6 +503,49 @@ func (dm *DockerManager) inspectContainer(id string, podName, podNamespace strin return &status, "", nil } +func getTerminationMessage(c DockerInterface, iResult *dockertypes.ContainerJSON, terminationMessagePath string, fallbackToLogs bool) string { + if len(terminationMessagePath) != 0 { + for _, mount := range iResult.Mounts { + if mount.Destination != terminationMessagePath { + continue + } + path := mount.Source + data, _, err := tail.ReadAtMost(path, kubecontainer.MaxContainerTerminationMessageLength) + if err != nil { + return fmt.Sprintf("Error on reading termination log %s: %v", path, err) + } + if !fallbackToLogs || len(data) != 0 { + return string(data) + } + } + } + if !fallbackToLogs { + return "" + } + + return readLastStringFromContainerLogs(c, iResult.Name) +} + +// readLastStringFromContainerLogs attempts to a certain amount from the end of the logs for containerName. +// It will attempt to avoid reading excessive logs from the server, which may result in underestimating the amount +// of logs to fetch (such that the length of the response message is < max). +func readLastStringFromContainerLogs(c DockerInterface, containerName string) string { + logOptions := dockertypes.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + } + buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength) + streamOptions := StreamOptions{ + ErrorStream: buf, + OutputStream: buf, + } + logOptions.Tail = strconv.FormatInt(kubecontainer.MaxContainerTerminationMessageLogLines, 10) + if err := c.Logs(containerName, logOptions, streamOptions); err != nil { + return fmt.Sprintf("Error on reading termination message from logs: %v", err) + } + return buf.String() +} + // makeEnvList converts EnvVar list to a list of strings, in the form of // '=', which can be understood by docker. func makeEnvList(envs []kubecontainer.EnvVar) (result []string) { diff --git a/pkg/kubelet/dockertools/labels.go b/pkg/kubelet/dockertools/labels.go index 3e453cbfaea..37edd50fde7 100644 --- a/pkg/kubelet/dockertools/labels.go +++ b/pkg/kubelet/dockertools/labels.go @@ -39,11 +39,12 @@ const ( kubernetesPodDeletionGracePeriodLabel = "io.kubernetes.pod.deletionGracePeriod" kubernetesPodTerminationGracePeriodLabel = "io.kubernetes.pod.terminationGracePeriod" - kubernetesContainerHashLabel = "io.kubernetes.container.hash" - kubernetesContainerRestartCountLabel = "io.kubernetes.container.restartCount" - kubernetesContainerTerminationMessagePathLabel = "io.kubernetes.container.terminationMessagePath" - kubernetesContainerPreStopHandlerLabel = "io.kubernetes.container.preStopHandler" - kubernetesContainerPortsLabel = "io.kubernetes.container.ports" // Added in 1.4 + kubernetesContainerHashLabel = "io.kubernetes.container.hash" + kubernetesContainerRestartCountLabel = "io.kubernetes.container.restartCount" + kubernetesContainerTerminationMessagePathLabel = "io.kubernetes.container.terminationMessagePath" + kubernetesContainerTerminationMessagePolicyLabel = "io.kubernetes.container.terminationMessagePolicy" + kubernetesContainerPreStopHandlerLabel = "io.kubernetes.container.preStopHandler" + kubernetesContainerPortsLabel = "io.kubernetes.container.ports" // Added in 1.4 // TODO(random-liu): Keep this for old containers, remove this when we drop support for v1.1. kubernetesPodLabel = "io.kubernetes.pod.data" @@ -63,6 +64,7 @@ type labelledContainerInfo struct { Hash string RestartCount int TerminationMessagePath string + TerminationMessagePolicy v1.TerminationMessagePolicy PreStopHandler *v1.Handler Ports []v1.ContainerPort } @@ -83,6 +85,7 @@ func newLabels(container *v1.Container, pod *v1.Pod, restartCount int, enableCus labels[kubernetesContainerHashLabel] = strconv.FormatUint(kubecontainer.HashContainer(container), 16) labels[kubernetesContainerRestartCountLabel] = strconv.Itoa(restartCount) labels[kubernetesContainerTerminationMessagePathLabel] = container.TerminationMessagePath + labels[kubernetesContainerTerminationMessagePolicyLabel] = string(container.TerminationMessagePolicy) if container.Lifecycle != nil && container.Lifecycle.PreStop != nil { // Using json enconding so that the PreStop handler object is readable after writing as a label rawPreStop, err := json.Marshal(container.Lifecycle.PreStop) @@ -118,7 +121,8 @@ func getContainerInfoFromLabel(labels map[string]string) *labelledContainerInfo PodUID: kubetypes.UID(getStringValueFromLabel(labels, types.KubernetesPodUIDLabel)), Name: getStringValueFromLabel(labels, types.KubernetesContainerNameLabel), Hash: getStringValueFromLabel(labels, kubernetesContainerHashLabel), - TerminationMessagePath: getStringValueFromLabel(labels, kubernetesContainerTerminationMessagePathLabel), + TerminationMessagePath: getStringValueFromLabel(labels, kubernetesContainerTerminationMessagePathLabel), + TerminationMessagePolicy: v1.TerminationMessagePolicy(getStringValueFromLabel(labels, kubernetesContainerTerminationMessagePolicyLabel)), } if containerInfo.RestartCount, err = getIntValueFromLabel(labels, kubernetesContainerRestartCountLabel); err != nil { logError(containerInfo, kubernetesContainerRestartCountLabel, err) diff --git a/pkg/kubelet/kuberuntime/BUILD b/pkg/kubelet/kuberuntime/BUILD index 25619ef2b3d..6861064cfed 100644 --- a/pkg/kubelet/kuberuntime/BUILD +++ b/pkg/kubelet/kuberuntime/BUILD @@ -47,7 +47,9 @@ go_library( "//pkg/securitycontext:go_default_library", "//pkg/util/parsers:go_default_library", "//pkg/util/selinux:go_default_library", + "//pkg/util/tail:go_default_library", "//pkg/util/version:go_default_library", + "//vendor:github.com/armon/circbuf", "//vendor:github.com/docker/docker/pkg/jsonlog", "//vendor:github.com/fsnotify/fsnotify", "//vendor:github.com/golang/glog", diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 529d0ba5e5a..128d52292f3 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -19,7 +19,6 @@ package kuberuntime import ( "fmt" "io" - "io/ioutil" "math/rand" "net/url" "os" @@ -28,7 +27,9 @@ import ( "sync" "time" + "github.com/armon/circbuf" "github.com/golang/glog" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubetypes "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -41,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/util/selinux" + "k8s.io/kubernetes/pkg/util/tail" ) // startContainer starts a container and returns a message indicates why it is failed on error. @@ -334,29 +336,36 @@ func makeUID() string { return fmt.Sprintf("%08x", rand.Uint32()) } -// getTerminationMessage gets termination message of the container. -func getTerminationMessage(status *runtimeapi.ContainerStatus, kubeStatus *kubecontainer.ContainerStatus, terminationMessagePath string) string { - message := "" - - if !kubeStatus.FinishedAt.IsZero() || kubeStatus.ExitCode != 0 { - if terminationMessagePath == "" { - return "" - } - +// getTerminationMessage looks on the filesystem for the provided termination message path, returning a limited +// amount of those bytes, or returns true if the logs should be checked. +func getTerminationMessage(status *runtimeapi.ContainerStatus, terminationMessagePath string, fallbackToLogs bool) (string, bool) { + if len(terminationMessagePath) != 0 { for _, mount := range status.Mounts { - if mount.ContainerPath == terminationMessagePath { - path := mount.HostPath - if data, err := ioutil.ReadFile(path); err != nil { - message = fmt.Sprintf("Error on reading termination-log %s: %v", path, err) - } else { - message = string(data) - } - break + if mount.ContainerPath != terminationMessagePath { + continue + } + path := mount.HostPath + data, _, err := tail.ReadAtMost(path, kubecontainer.MaxContainerTerminationMessageLength) + if err != nil { + return fmt.Sprintf("Error on reading termination log %s: %v", path, err), false + } + if !fallbackToLogs || len(data) != 0 { + return string(data), false } } } + return "", fallbackToLogs +} - return message +// readLastStringFromContainerLogs attempts to read up to the max log length from the end of the CRI log represented +// by path. It reads up to max log lines. +func readLastStringFromContainerLogs(path string) string { + value := int64(kubecontainer.MaxContainerTerminationMessageLogLines) + buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength) + if err := ReadLogs(path, &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil { + return fmt.Sprintf("Error on reading termination message from logs: %v", err) + } + return buf.String() } // getPodContainerStatuses gets all containers' statuses for the pod. @@ -402,13 +411,19 @@ func (m *kubeGenericRuntimeManager) getPodContainerStatuses(uid kubetypes.UID, n cStatus.Message = status.Message cStatus.ExitCode = int(status.ExitCode) cStatus.FinishedAt = time.Unix(0, status.FinishedAt) + + fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError && (cStatus.ExitCode != 0 || cStatus.Reason == "OOMKilled") + tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs) + if checkLogs { + path := buildFullContainerLogsPath(uid, labeledInfo.ContainerName, annotatedInfo.RestartCount) + tMessage = readLastStringFromContainerLogs(path) + } + // Use the termination message written by the application is not empty + if len(tMessage) != 0 { + cStatus.Message = tMessage + } } - tMessage := getTerminationMessage(status, cStatus, annotatedInfo.TerminationMessagePath) - // Use the termination message written by the application is not empty - if len(tMessage) != 0 { - cStatus.Message = tMessage - } statuses[i] = cStatus } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_logs.go b/pkg/kubelet/kuberuntime/kuberuntime_logs.go index 3cc0eb17640..21d81bc6a04 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_logs.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_logs.go @@ -32,6 +32,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/util/tail" ) // Notice that the current kuberuntime logs implementation doesn't handle @@ -120,7 +121,7 @@ func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) opts := newLogOptions(apiOpts, time.Now()) // Search start point based on tail line. - start, err := tail(f, opts.tail) + start, err := tail.FindTailLineStartIndex(f, opts.tail) if err != nil { return fmt.Errorf("failed to tail %d lines of log file %q: %v", opts.tail, path, err) } @@ -347,40 +348,3 @@ func (w *logWriter) write(msg *logMessage) error { } return nil } - -// tail returns the start of last nth line. -// * If n < 0, return the beginning of the file. -// * If n >= 0, return the beginning of last nth line. -// Notice that if the last line is incomplete (no end-of-line), it will not be counted -// as one line. -func tail(f io.ReadSeeker, n int64) (int64, error) { - if n < 0 { - return 0, nil - } - size, err := f.Seek(0, os.SEEK_END) - if err != nil { - return 0, err - } - var left, cnt int64 - buf := make([]byte, blockSize) - for right := size; right > 0 && cnt <= n; right -= blockSize { - left = right - blockSize - if left < 0 { - left = 0 - buf = make([]byte, right) - } - if _, err := f.Seek(left, os.SEEK_SET); err != nil { - return 0, err - } - if _, err := f.Read(buf); err != nil { - return 0, err - } - cnt += int64(bytes.Count(buf, eol)) - } - for ; cnt > n; cnt-- { - idx := bytes.Index(buf, eol) + 1 - buf = buf[idx:] - left += int64(idx) - } - return left, nil -} diff --git a/pkg/kubelet/kuberuntime/kuberuntime_logs_test.go b/pkg/kubelet/kuberuntime/kuberuntime_logs_test.go index 9fb07d6aadb..1602827a833 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_logs_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_logs_test.go @@ -18,7 +18,6 @@ package kuberuntime import ( "bytes" - "strings" "testing" "time" @@ -242,28 +241,3 @@ func TestWriteLogsWithBytesLimit(t *testing.T) { assert.Equal(t, test.expectStderr, stderrBuf.String()) } } - -func TestTail(t *testing.T) { - line := strings.Repeat("a", blockSize) - testBytes := []byte(line + "\n" + - line + "\n" + - line + "\n" + - line + "\n" + - line[blockSize/2:]) // incomplete line - - for c, test := range []struct { - n int64 - start int64 - }{ - {n: -1, start: 0}, - {n: 0, start: int64(len(line)+1) * 4}, - {n: 1, start: int64(len(line)+1) * 3}, - {n: 9999, start: 0}, - } { - t.Logf("TestCase #%d: %+v", c, test) - r := bytes.NewReader(testBytes) - s, err := tail(r, test.n) - assert.NoError(t, err) - assert.Equal(t, s, test.start) - } -} diff --git a/pkg/kubelet/kuberuntime/labels.go b/pkg/kubelet/kuberuntime/labels.go index 462442b9091..ccd0dc99b0b 100644 --- a/pkg/kubelet/kuberuntime/labels.go +++ b/pkg/kubelet/kuberuntime/labels.go @@ -33,11 +33,12 @@ const ( podDeletionGracePeriodLabel = "io.kubernetes.pod.deletionGracePeriod" podTerminationGracePeriodLabel = "io.kubernetes.pod.terminationGracePeriod" - containerHashLabel = "io.kubernetes.container.hash" - containerRestartCountLabel = "io.kubernetes.container.restartCount" - containerTerminationMessagePathLabel = "io.kubernetes.container.terminationMessagePath" - containerPreStopHandlerLabel = "io.kubernetes.container.preStopHandler" - containerPortsLabel = "io.kubernetes.container.ports" + containerHashLabel = "io.kubernetes.container.hash" + containerRestartCountLabel = "io.kubernetes.container.restartCount" + containerTerminationMessagePathLabel = "io.kubernetes.container.terminationMessagePath" + containerTerminationMessagePolicyLabel = "io.kubernetes.container.terminationMessagePolicy" + containerPreStopHandlerLabel = "io.kubernetes.container.preStopHandler" + containerPortsLabel = "io.kubernetes.container.ports" // kubernetesManagedLabel is used to distinguish whether a container/sandbox is managed by kubelet or not kubernetesManagedLabel = "io.kubernetes.managed" @@ -69,6 +70,7 @@ type annotatedContainerInfo struct { PodDeletionGracePeriod *int64 PodTerminationGracePeriod *int64 TerminationMessagePath string + TerminationMessagePolicy v1.TerminationMessagePolicy PreStopHandler *v1.Handler ContainerPorts []v1.ContainerPort } @@ -113,6 +115,7 @@ func newContainerAnnotations(container *v1.Container, pod *v1.Pod, restartCount annotations[containerHashLabel] = strconv.FormatUint(kubecontainer.HashContainer(container), 16) annotations[containerRestartCountLabel] = strconv.Itoa(restartCount) annotations[containerTerminationMessagePathLabel] = container.TerminationMessagePath + annotations[containerTerminationMessagePolicyLabel] = string(container.TerminationMessagePolicy) if pod.DeletionGracePeriodSeconds != nil { annotations[podDeletionGracePeriodLabel] = strconv.FormatInt(*pod.DeletionGracePeriodSeconds, 10) @@ -192,7 +195,8 @@ func isManagedByKubelet(labels map[string]string) bool { func getContainerInfoFromAnnotations(annotations map[string]string) *annotatedContainerInfo { var err error containerInfo := &annotatedContainerInfo{ - TerminationMessagePath: getStringValueFromLabel(annotations, containerTerminationMessagePathLabel), + TerminationMessagePath: getStringValueFromLabel(annotations, containerTerminationMessagePathLabel), + TerminationMessagePolicy: v1.TerminationMessagePolicy(getStringValueFromLabel(annotations, containerTerminationMessagePolicyLabel)), } if containerInfo.Hash, err = getUint64ValueFromLabel(annotations, containerHashLabel); err != nil { diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index bcb81613750..47a5f3a632d 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -513,6 +513,10 @@ func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool { // kubelet temporarily. // TODO(random-liu): Remove timestamp related logic after apiserver supports nanosecond or makes it consistent. func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus { + bytesPerStatus := kubecontainer.MaxPodTerminationMessageLogLength + if containers := len(pod.Spec.Containers) + len(pod.Spec.InitContainers); containers > 0 { + bytesPerStatus = bytesPerStatus / containers + } normalizeTimeStamp := func(t *metav1.Time) { *t = t.Rfc3339Copy() } @@ -523,6 +527,9 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus { if c.Terminated != nil { normalizeTimeStamp(&c.Terminated.StartedAt) normalizeTimeStamp(&c.Terminated.FinishedAt) + if len(c.Terminated.Message) > bytesPerStatus { + c.Terminated.Message = c.Terminated.Message[:bytesPerStatus] + } } } diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index c4b5a2b7594..9540a3a9d86 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -20,21 +20,21 @@ import ( "fmt" "math/rand" "strconv" + "strings" "testing" "time" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" - "k8s.io/kubernetes/pkg/client/testing/core" - "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + "k8s.io/kubernetes/pkg/client/testing/core" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" @@ -480,6 +480,40 @@ func TestStatusEquality(t *testing.T) { } } +func TestStatusNormalizationEnforcesMaxBytes(t *testing.T) { + pod := v1.Pod{ + Spec: v1.PodSpec{}, + } + containerStatus := []v1.ContainerStatus{} + for i := 0; i < 48; i++ { + s := v1.ContainerStatus{ + Name: fmt.Sprintf("container%d", i), + LastTerminationState: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + Message: strings.Repeat("abcdefgh", int(24+i%3)), + }, + }, + } + containerStatus = append(containerStatus, s) + } + podStatus := v1.PodStatus{ + InitContainerStatuses: containerStatus[:24], + ContainerStatuses: containerStatus[24:], + } + result := normalizeStatus(&pod, &podStatus) + count := 0 + for _, s := range result.InitContainerStatuses { + l := len(s.LastTerminationState.Terminated.Message) + if l < 192 || l > 256 { + t.Errorf("container message had length %d", l) + } + count += l + } + if count > kubecontainer.MaxPodTerminationMessageLogLength { + t.Errorf("message length not truncated") + } +} + func TestStaticPod(t *testing.T) { staticPod := getTestPod() staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"} diff --git a/pkg/util/tail/BUILD b/pkg/util/tail/BUILD index a7403fd473c..5d7a1b54c4f 100644 --- a/pkg/util/tail/BUILD +++ b/pkg/util/tail/BUILD @@ -2,7 +2,11 @@ package(default_visibility = ["//visibility:public"]) licenses(["notice"]) -load("@io_bazel_rules_go//go:def.bzl") +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) filegroup( name = "package-srcs", @@ -16,3 +20,16 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["tail_test.go"], + library = ":go_default_library", + tags = ["automanaged"], +) + +go_library( + name = "go_default_library", + srcs = ["tail.go"], + tags = ["automanaged"], +) diff --git a/pkg/util/tail/tail.go b/pkg/util/tail/tail.go new file mode 100644 index 00000000000..23ad4ae7911 --- /dev/null +++ b/pkg/util/tail/tail.go @@ -0,0 +1,99 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tail + +import ( + "bytes" + "io" + "io/ioutil" + "os" +) + +const ( + // blockSize is the block size used in tail. + blockSize = 1024 +) + +var ( + // eol is the end-of-line sign in the log. + eol = []byte{'\n'} +) + +// ReadAtMost reads at most max bytes from the end of the file identified by path or +// returns an error. It returns true if the file was longer than max. It will +// allocate up to max bytes. +func ReadAtMost(path string, max int64) ([]byte, bool, error) { + f, err := os.Open(path) + if err != nil { + return nil, false, err + } + defer f.Close() + fi, err := f.Stat() + if err != nil { + return nil, false, err + } + size := fi.Size() + if size == 0 { + return nil, false, nil + } + if size < max { + max = size + } + offset, err := f.Seek(-max, os.SEEK_END) + if err != nil { + return nil, false, err + } + data, err := ioutil.ReadAll(f) + return data, offset > 0, err +} + +// FindTailLineStartIndex returns the start of last nth line. +// * If n < 0, return the beginning of the file. +// * If n >= 0, return the beginning of last nth line. +// Notice that if the last line is incomplete (no end-of-line), it will not be counted +// as one line. +func FindTailLineStartIndex(f io.ReadSeeker, n int64) (int64, error) { + if n < 0 { + return 0, nil + } + size, err := f.Seek(0, os.SEEK_END) + if err != nil { + return 0, err + } + var left, cnt int64 + buf := make([]byte, blockSize) + for right := size; right > 0 && cnt <= n; right -= blockSize { + left = right - blockSize + if left < 0 { + left = 0 + buf = make([]byte, right) + } + if _, err := f.Seek(left, os.SEEK_SET); err != nil { + return 0, err + } + if _, err := f.Read(buf); err != nil { + return 0, err + } + cnt += int64(bytes.Count(buf, eol)) + } + for ; cnt > n; cnt-- { + idx := bytes.Index(buf, eol) + 1 + buf = buf[idx:] + left += int64(idx) + } + return left, nil +} diff --git a/pkg/util/tail/tail_test.go b/pkg/util/tail/tail_test.go new file mode 100644 index 00000000000..18d3f90c21a --- /dev/null +++ b/pkg/util/tail/tail_test.go @@ -0,0 +1,52 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tail + +import ( + "bytes" + "strings" + "testing" +) + +func TestTail(t *testing.T) { + line := strings.Repeat("a", blockSize) + testBytes := []byte(line + "\n" + + line + "\n" + + line + "\n" + + line + "\n" + + line[blockSize/2:]) // incomplete line + + for c, test := range []struct { + n int64 + start int64 + }{ + {n: -1, start: 0}, + {n: 0, start: int64(len(line)+1) * 4}, + {n: 1, start: int64(len(line)+1) * 3}, + {n: 9999, start: 0}, + } { + t.Logf("TestCase #%d: %+v", c, test) + r := bytes.NewReader(testBytes) + s, err := FindTailLineStartIndex(r, test.n) + if err != nil { + t.Error(err) + } + if s != test.start { + t.Errorf("%d != %d", s, test.start) + } + } +}