From ddae396ce35dc9353c9e5e0009e143b4fd6ff6c6 Mon Sep 17 00:00:00 2001 From: Ryan Phillips Date: Mon, 31 Oct 2022 16:49:08 -0500 Subject: [PATCH] kubelet: fix pod log line corruption when using timestamps and long lines --- pkg/kubelet/kuberuntime/logs/logs.go | 13 +++-- pkg/kubelet/kuberuntime/logs/logs_test.go | 61 +++++++++++++++++++++-- 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/pkg/kubelet/kuberuntime/logs/logs.go b/pkg/kubelet/kuberuntime/logs/logs.go index c69b557e37d..fc9a46665ee 100644 --- a/pkg/kubelet/kuberuntime/logs/logs.go +++ b/pkg/kubelet/kuberuntime/logs/logs.go @@ -237,13 +237,13 @@ func newLogWriter(stdout io.Writer, stderr io.Writer, opts *LogOptions) *logWrit } // writeLogs writes logs into stdout, stderr. -func (w *logWriter) write(msg *logMessage) error { +func (w *logWriter) write(msg *logMessage, addPrefix bool) error { if msg.timestamp.Before(w.opts.since) { // Skip the line because it's older than since return nil } line := msg.log - if w.opts.timestamp { + if w.opts.timestamp && addPrefix { prefix := append([]byte(msg.timestamp.Format(timeFormatOut)), delimiter[0]) line = append(prefix, line...) } @@ -314,6 +314,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r var watcher *fsnotify.Watcher var parse parseFunc var stop bool + isNewLine := true found := true writer := newLogWriter(stdout, stderr, opts) msg := &logMessage{} @@ -399,7 +400,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r continue } // Write the log line into the stream. - if err := writer.write(msg); err != nil { + if err := writer.write(msg, isNewLine); err != nil { if err == errMaximumWrite { klog.V(2).InfoS("Finished parsing log file, hit bytes limit", "path", path, "limit", opts.bytes) return nil @@ -410,7 +411,11 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r if limitedMode { limitedNum-- } - + if len(msg.log) > 0 { + isNewLine = msg.log[len(msg.log)-1] == eol[0] + } else { + isNewLine = true + } } } diff --git a/pkg/kubelet/kuberuntime/logs/logs_test.go b/pkg/kubelet/kuberuntime/logs/logs_test.go index 383212f7b83..98f5a49a779 100644 --- a/pkg/kubelet/kuberuntime/logs/logs_test.go +++ b/pkg/kubelet/kuberuntime/logs/logs_test.go @@ -17,8 +17,12 @@ limitations under the License. package logs import ( + "bufio" "bytes" "context" + "fmt" + "io" + "io/ioutil" "os" "testing" "time" @@ -320,7 +324,7 @@ func TestWriteLogs(t *testing.T) { stdoutBuf := bytes.NewBuffer(nil) stderrBuf := bytes.NewBuffer(nil) w := newLogWriter(stdoutBuf, stderrBuf, &LogOptions{since: test.since, timestamp: test.timestamp, bytes: -1}) - err := w.write(msg) + err := w.write(msg, true) assert.NoError(t, err) assert.Equal(t, test.expectStdout, stdoutBuf.String()) assert.Equal(t, test.expectStderr, stderrBuf.String()) @@ -384,13 +388,13 @@ func TestWriteLogsWithBytesLimit(t *testing.T) { w := newLogWriter(stdoutBuf, stderrBuf, &LogOptions{timestamp: test.timestamp, bytes: int64(test.bytes)}) for i := 0; i < test.stdoutLines; i++ { msg.stream = runtimeapi.Stdout - if err := w.write(msg); err != nil { + if err := w.write(msg, true); err != nil { assert.EqualError(t, err, errMaximumWrite.Error()) } } for i := 0; i < test.stderrLines; i++ { msg.stream = runtimeapi.Stderr - if err := w.write(msg); err != nil { + if err := w.write(msg, true); err != nil { assert.EqualError(t, err, errMaximumWrite.Error()) } } @@ -398,3 +402,54 @@ func TestWriteLogsWithBytesLimit(t *testing.T) { assert.Equal(t, test.expectStderr, stderrBuf.String()) } } + +func TestReadLogsLimitsWithTimestamps(t *testing.T) { + logLineFmt := "2022-10-29T16:10:22.592603036-05:00 stdout P %v\n" + logLineNewLine := "2022-10-29T16:10:22.592603036-05:00 stdout F \n" + + tmpfile, err := ioutil.TempFile("", "log.*.txt") + assert.NoError(t, err) + + count := 10000 + + for i := 0; i < count; i++ { + tmpfile.WriteString(fmt.Sprintf(logLineFmt, i)) + } + tmpfile.WriteString(logLineNewLine) + + for i := 0; i < count; i++ { + tmpfile.WriteString(fmt.Sprintf(logLineFmt, i)) + } + tmpfile.WriteString(logLineNewLine) + + // two lines are in the buffer + + defer os.Remove(tmpfile.Name()) // clean up + + assert.NoError(t, err) + tmpfile.Close() + + var buf bytes.Buffer + w := io.MultiWriter(&buf) + + err = ReadLogs(context.Background(), tmpfile.Name(), "", &LogOptions{tail: -1, bytes: -1, timestamp: true}, nil, w, w) + assert.NoError(t, err) + + lineCount := 0 + scanner := bufio.NewScanner(bytes.NewReader(buf.Bytes())) + for scanner.Scan() { + lineCount++ + + // Split the line + ts, logline, _ := bytes.Cut(scanner.Bytes(), []byte(" ")) + + // Verification + // 1. The timestamp should exist + // 2. The last item in the log should be 9999 + _, err = time.Parse(time.RFC3339, string(ts)) + assert.NoError(t, err, "timestamp not found") + assert.Equal(t, true, bytes.HasSuffix(logline, []byte("9999")), "is the complete log found") + } + + assert.Equal(t, 2, lineCount, "should have two lines") +}