mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
kubelet: fix pod log line corruption when using timestamps and long lines
This commit is contained in:
parent
f9bfa378ef
commit
ddae396ce3
@ -237,13 +237,13 @@ func newLogWriter(stdout io.Writer, stderr io.Writer, opts *LogOptions) *logWrit
|
|||||||
}
|
}
|
||||||
|
|
||||||
// writeLogs writes logs into stdout, stderr.
|
// writeLogs writes logs into stdout, stderr.
|
||||||
func (w *logWriter) write(msg *logMessage) error {
|
func (w *logWriter) write(msg *logMessage, addPrefix bool) error {
|
||||||
if msg.timestamp.Before(w.opts.since) {
|
if msg.timestamp.Before(w.opts.since) {
|
||||||
// Skip the line because it's older than since
|
// Skip the line because it's older than since
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
line := msg.log
|
line := msg.log
|
||||||
if w.opts.timestamp {
|
if w.opts.timestamp && addPrefix {
|
||||||
prefix := append([]byte(msg.timestamp.Format(timeFormatOut)), delimiter[0])
|
prefix := append([]byte(msg.timestamp.Format(timeFormatOut)), delimiter[0])
|
||||||
line = append(prefix, line...)
|
line = append(prefix, line...)
|
||||||
}
|
}
|
||||||
@ -314,6 +314,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
|
|||||||
var watcher *fsnotify.Watcher
|
var watcher *fsnotify.Watcher
|
||||||
var parse parseFunc
|
var parse parseFunc
|
||||||
var stop bool
|
var stop bool
|
||||||
|
isNewLine := true
|
||||||
found := true
|
found := true
|
||||||
writer := newLogWriter(stdout, stderr, opts)
|
writer := newLogWriter(stdout, stderr, opts)
|
||||||
msg := &logMessage{}
|
msg := &logMessage{}
|
||||||
@ -399,7 +400,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Write the log line into the stream.
|
// Write the log line into the stream.
|
||||||
if err := writer.write(msg); err != nil {
|
if err := writer.write(msg, isNewLine); err != nil {
|
||||||
if err == errMaximumWrite {
|
if err == errMaximumWrite {
|
||||||
klog.V(2).InfoS("Finished parsing log file, hit bytes limit", "path", path, "limit", opts.bytes)
|
klog.V(2).InfoS("Finished parsing log file, hit bytes limit", "path", path, "limit", opts.bytes)
|
||||||
return nil
|
return nil
|
||||||
@ -410,7 +411,11 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
|
|||||||
if limitedMode {
|
if limitedMode {
|
||||||
limitedNum--
|
limitedNum--
|
||||||
}
|
}
|
||||||
|
if len(msg.log) > 0 {
|
||||||
|
isNewLine = msg.log[len(msg.log)-1] == eol[0]
|
||||||
|
} else {
|
||||||
|
isNewLine = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,8 +17,12 @@ limitations under the License.
|
|||||||
package logs
|
package logs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -320,7 +324,7 @@ func TestWriteLogs(t *testing.T) {
|
|||||||
stdoutBuf := bytes.NewBuffer(nil)
|
stdoutBuf := bytes.NewBuffer(nil)
|
||||||
stderrBuf := bytes.NewBuffer(nil)
|
stderrBuf := bytes.NewBuffer(nil)
|
||||||
w := newLogWriter(stdoutBuf, stderrBuf, &LogOptions{since: test.since, timestamp: test.timestamp, bytes: -1})
|
w := newLogWriter(stdoutBuf, stderrBuf, &LogOptions{since: test.since, timestamp: test.timestamp, bytes: -1})
|
||||||
err := w.write(msg)
|
err := w.write(msg, true)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, test.expectStdout, stdoutBuf.String())
|
assert.Equal(t, test.expectStdout, stdoutBuf.String())
|
||||||
assert.Equal(t, test.expectStderr, stderrBuf.String())
|
assert.Equal(t, test.expectStderr, stderrBuf.String())
|
||||||
@ -384,13 +388,13 @@ func TestWriteLogsWithBytesLimit(t *testing.T) {
|
|||||||
w := newLogWriter(stdoutBuf, stderrBuf, &LogOptions{timestamp: test.timestamp, bytes: int64(test.bytes)})
|
w := newLogWriter(stdoutBuf, stderrBuf, &LogOptions{timestamp: test.timestamp, bytes: int64(test.bytes)})
|
||||||
for i := 0; i < test.stdoutLines; i++ {
|
for i := 0; i < test.stdoutLines; i++ {
|
||||||
msg.stream = runtimeapi.Stdout
|
msg.stream = runtimeapi.Stdout
|
||||||
if err := w.write(msg); err != nil {
|
if err := w.write(msg, true); err != nil {
|
||||||
assert.EqualError(t, err, errMaximumWrite.Error())
|
assert.EqualError(t, err, errMaximumWrite.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := 0; i < test.stderrLines; i++ {
|
for i := 0; i < test.stderrLines; i++ {
|
||||||
msg.stream = runtimeapi.Stderr
|
msg.stream = runtimeapi.Stderr
|
||||||
if err := w.write(msg); err != nil {
|
if err := w.write(msg, true); err != nil {
|
||||||
assert.EqualError(t, err, errMaximumWrite.Error())
|
assert.EqualError(t, err, errMaximumWrite.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -398,3 +402,54 @@ func TestWriteLogsWithBytesLimit(t *testing.T) {
|
|||||||
assert.Equal(t, test.expectStderr, stderrBuf.String())
|
assert.Equal(t, test.expectStderr, stderrBuf.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReadLogsLimitsWithTimestamps(t *testing.T) {
|
||||||
|
logLineFmt := "2022-10-29T16:10:22.592603036-05:00 stdout P %v\n"
|
||||||
|
logLineNewLine := "2022-10-29T16:10:22.592603036-05:00 stdout F \n"
|
||||||
|
|
||||||
|
tmpfile, err := ioutil.TempFile("", "log.*.txt")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
count := 10000
|
||||||
|
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
tmpfile.WriteString(fmt.Sprintf(logLineFmt, i))
|
||||||
|
}
|
||||||
|
tmpfile.WriteString(logLineNewLine)
|
||||||
|
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
tmpfile.WriteString(fmt.Sprintf(logLineFmt, i))
|
||||||
|
}
|
||||||
|
tmpfile.WriteString(logLineNewLine)
|
||||||
|
|
||||||
|
// two lines are in the buffer
|
||||||
|
|
||||||
|
defer os.Remove(tmpfile.Name()) // clean up
|
||||||
|
|
||||||
|
assert.NoError(t, err)
|
||||||
|
tmpfile.Close()
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
w := io.MultiWriter(&buf)
|
||||||
|
|
||||||
|
err = ReadLogs(context.Background(), tmpfile.Name(), "", &LogOptions{tail: -1, bytes: -1, timestamp: true}, nil, w, w)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
lineCount := 0
|
||||||
|
scanner := bufio.NewScanner(bytes.NewReader(buf.Bytes()))
|
||||||
|
for scanner.Scan() {
|
||||||
|
lineCount++
|
||||||
|
|
||||||
|
// Split the line
|
||||||
|
ts, logline, _ := bytes.Cut(scanner.Bytes(), []byte(" "))
|
||||||
|
|
||||||
|
// Verification
|
||||||
|
// 1. The timestamp should exist
|
||||||
|
// 2. The last item in the log should be 9999
|
||||||
|
_, err = time.Parse(time.RFC3339, string(ts))
|
||||||
|
assert.NoError(t, err, "timestamp not found")
|
||||||
|
assert.Equal(t, true, bytes.HasSuffix(logline, []byte("9999")), "is the complete log found")
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, 2, lineCount, "should have two lines")
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user