mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Merge pull request #113481 from rphillips/fixes/77063
kubelet: fix pod log line corruption when using timestamps and long lines
This commit is contained in:
commit
5899432f92
@ -237,13 +237,13 @@ func newLogWriter(stdout io.Writer, stderr io.Writer, opts *LogOptions) *logWrit
|
||||
}
|
||||
|
||||
// writeLogs writes logs into stdout, stderr.
|
||||
func (w *logWriter) write(msg *logMessage) error {
|
||||
func (w *logWriter) write(msg *logMessage, addPrefix bool) error {
|
||||
if msg.timestamp.Before(w.opts.since) {
|
||||
// Skip the line because it's older than since
|
||||
return nil
|
||||
}
|
||||
line := msg.log
|
||||
if w.opts.timestamp {
|
||||
if w.opts.timestamp && addPrefix {
|
||||
prefix := append([]byte(msg.timestamp.Format(timeFormatOut)), delimiter[0])
|
||||
line = append(prefix, line...)
|
||||
}
|
||||
@ -314,6 +314,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
|
||||
var watcher *fsnotify.Watcher
|
||||
var parse parseFunc
|
||||
var stop bool
|
||||
isNewLine := true
|
||||
found := true
|
||||
writer := newLogWriter(stdout, stderr, opts)
|
||||
msg := &logMessage{}
|
||||
@ -399,7 +400,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
|
||||
continue
|
||||
}
|
||||
// Write the log line into the stream.
|
||||
if err := writer.write(msg); err != nil {
|
||||
if err := writer.write(msg, isNewLine); err != nil {
|
||||
if err == errMaximumWrite {
|
||||
klog.V(2).InfoS("Finished parsing log file, hit bytes limit", "path", path, "limit", opts.bytes)
|
||||
return nil
|
||||
@ -410,7 +411,11 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
|
||||
if limitedMode {
|
||||
limitedNum--
|
||||
}
|
||||
|
||||
if len(msg.log) > 0 {
|
||||
isNewLine = msg.log[len(msg.log)-1] == eol[0]
|
||||
} else {
|
||||
isNewLine = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,8 +17,12 @@ limitations under the License.
|
||||
package logs
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
@ -320,7 +324,7 @@ func TestWriteLogs(t *testing.T) {
|
||||
stdoutBuf := bytes.NewBuffer(nil)
|
||||
stderrBuf := bytes.NewBuffer(nil)
|
||||
w := newLogWriter(stdoutBuf, stderrBuf, &LogOptions{since: test.since, timestamp: test.timestamp, bytes: -1})
|
||||
err := w.write(msg)
|
||||
err := w.write(msg, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, test.expectStdout, stdoutBuf.String())
|
||||
assert.Equal(t, test.expectStderr, stderrBuf.String())
|
||||
@ -384,13 +388,13 @@ func TestWriteLogsWithBytesLimit(t *testing.T) {
|
||||
w := newLogWriter(stdoutBuf, stderrBuf, &LogOptions{timestamp: test.timestamp, bytes: int64(test.bytes)})
|
||||
for i := 0; i < test.stdoutLines; i++ {
|
||||
msg.stream = runtimeapi.Stdout
|
||||
if err := w.write(msg); err != nil {
|
||||
if err := w.write(msg, true); err != nil {
|
||||
assert.EqualError(t, err, errMaximumWrite.Error())
|
||||
}
|
||||
}
|
||||
for i := 0; i < test.stderrLines; i++ {
|
||||
msg.stream = runtimeapi.Stderr
|
||||
if err := w.write(msg); err != nil {
|
||||
if err := w.write(msg, true); err != nil {
|
||||
assert.EqualError(t, err, errMaximumWrite.Error())
|
||||
}
|
||||
}
|
||||
@ -398,3 +402,54 @@ func TestWriteLogsWithBytesLimit(t *testing.T) {
|
||||
assert.Equal(t, test.expectStderr, stderrBuf.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadLogsLimitsWithTimestamps(t *testing.T) {
|
||||
logLineFmt := "2022-10-29T16:10:22.592603036-05:00 stdout P %v\n"
|
||||
logLineNewLine := "2022-10-29T16:10:22.592603036-05:00 stdout F \n"
|
||||
|
||||
tmpfile, err := ioutil.TempFile("", "log.*.txt")
|
||||
assert.NoError(t, err)
|
||||
|
||||
count := 10000
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
tmpfile.WriteString(fmt.Sprintf(logLineFmt, i))
|
||||
}
|
||||
tmpfile.WriteString(logLineNewLine)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
tmpfile.WriteString(fmt.Sprintf(logLineFmt, i))
|
||||
}
|
||||
tmpfile.WriteString(logLineNewLine)
|
||||
|
||||
// two lines are in the buffer
|
||||
|
||||
defer os.Remove(tmpfile.Name()) // clean up
|
||||
|
||||
assert.NoError(t, err)
|
||||
tmpfile.Close()
|
||||
|
||||
var buf bytes.Buffer
|
||||
w := io.MultiWriter(&buf)
|
||||
|
||||
err = ReadLogs(context.Background(), tmpfile.Name(), "", &LogOptions{tail: -1, bytes: -1, timestamp: true}, nil, w, w)
|
||||
assert.NoError(t, err)
|
||||
|
||||
lineCount := 0
|
||||
scanner := bufio.NewScanner(bytes.NewReader(buf.Bytes()))
|
||||
for scanner.Scan() {
|
||||
lineCount++
|
||||
|
||||
// Split the line
|
||||
ts, logline, _ := bytes.Cut(scanner.Bytes(), []byte(" "))
|
||||
|
||||
// Verification
|
||||
// 1. The timestamp should exist
|
||||
// 2. The last item in the log should be 9999
|
||||
_, err = time.Parse(time.RFC3339, string(ts))
|
||||
assert.NoError(t, err, "timestamp not found")
|
||||
assert.Equal(t, true, bytes.HasSuffix(logline, []byte("9999")), "is the complete log found")
|
||||
}
|
||||
|
||||
assert.Equal(t, 2, lineCount, "should have two lines")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user