Merge pull request #115702 from xyz-li/master

Fix:  kubelet will not output logs after log file is rotated
This commit is contained in:
Kubernetes Prow Robot 2023-10-14 22:42:04 +02:00 committed by GitHub
commit 4911aad463
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 92 additions and 18 deletions

View File

@ -318,6 +318,8 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
found := true
writer := newLogWriter(stdout, stderr, opts)
msg := &logMessage{}
baseName := filepath.Base(path)
dir := filepath.Dir(path)
for {
if stop || (limitedMode && limitedNum == 0) {
klog.V(2).InfoS("Finished parsing log file", "path", path)
@ -344,8 +346,8 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
return fmt.Errorf("failed to create fsnotify watcher: %v", err)
}
defer watcher.Close()
if err := watcher.Add(f.Name()); err != nil {
return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
if err := watcher.Add(dir); err != nil {
return fmt.Errorf("failed to watch directory %q: %w", dir, err)
}
// If we just created the watcher, try again to read as we might have missed
// the event.
@ -353,7 +355,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
}
var recreated bool
// Wait until the next log change.
found, recreated, err = waitLogs(ctx, containerID, watcher, runtimeService)
found, recreated, err = waitLogs(ctx, containerID, baseName, watcher, runtimeService)
if err != nil {
return err
}
@ -367,13 +369,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
}
defer newF.Close()
f.Close()
if err := watcher.Remove(f.Name()); err != nil && !os.IsNotExist(err) {
klog.ErrorS(err, "Failed to remove file watch", "path", f.Name())
}
f = newF
if err := watcher.Add(f.Name()); err != nil {
return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
}
r = bufio.NewReader(f)
}
// If the container exited consume data until the next EOF
@ -441,7 +437,7 @@ func isContainerRunning(ctx context.Context, id string, r internalapi.RuntimeSer
// waitLogs wait for the next log write. It returns two booleans and an error. The first boolean
// indicates whether a new log is found; the second boolean if the log file was recreated;
// the error is error happens during waiting new logs.
func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, bool, error) {
func waitLogs(ctx context.Context, id string, logName string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, bool, error) {
// no need to wait if the pod is not running
if running, err := isContainerRunning(ctx, id, runtimeService); !running {
return false, false, err
@ -453,16 +449,10 @@ func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeServic
return false, false, fmt.Errorf("context cancelled")
case e := <-w.Events:
switch e.Op {
case fsnotify.Write:
case fsnotify.Write, fsnotify.Rename, fsnotify.Remove, fsnotify.Chmod:
return true, false, nil
case fsnotify.Create:
fallthrough
case fsnotify.Rename:
fallthrough
case fsnotify.Remove:
fallthrough
case fsnotify.Chmod:
return true, true, nil
return true, filepath.Base(e.Name) == logName, nil
default:
klog.ErrorS(nil, "Received unexpected fsnotify event, retrying", "event", e)
}

View File

@ -23,6 +23,7 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"testing"
"time"
@ -30,6 +31,7 @@ import (
v1 "k8s.io/api/core/v1"
apitesting "k8s.io/cri-api/pkg/apis/testing"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/pointer"
"github.com/stretchr/testify/assert"
@ -211,6 +213,88 @@ func TestReadLogs(t *testing.T) {
}
}
func TestReadRotatedLog(t *testing.T) {
tmpDir := t.TempDir()
file, err := os.CreateTemp(tmpDir, "logfile")
if err != nil {
assert.NoErrorf(t, err, "unable to create temp file")
}
stdoutBuf := &bytes.Buffer{}
stderrBuf := &bytes.Buffer{}
containerID := "fake-container-id"
fakeRuntimeService := &apitesting.FakeRuntimeService{
Containers: map[string]*apitesting.FakeContainer{
containerID: {
ContainerStatus: runtimeapi.ContainerStatus{
State: runtimeapi.ContainerState_CONTAINER_RUNNING,
},
},
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start to follow the container's log.
go func(ctx context.Context) {
podLogOptions := v1.PodLogOptions{
Follow: true,
}
opts := NewLogOptions(&podLogOptions, time.Now())
ReadLogs(ctx, file.Name(), containerID, opts, fakeRuntimeService, stdoutBuf, stderrBuf)
}(ctx)
// log in stdout
expectedStdout := "line0\nline2\nline4\nline6\nline8\n"
// log in stderr
expectedStderr := "line1\nline3\nline5\nline7\nline9\n"
dir := filepath.Dir(file.Name())
baseName := filepath.Base(file.Name())
// Write 10 lines to log file.
// Let ReadLogs start.
time.Sleep(50 * time.Millisecond)
for line := 0; line < 10; line++ {
// Write the first three lines to log file
now := time.Now().Format(types.RFC3339NanoLenient)
if line%2 == 0 {
file.WriteString(fmt.Sprintf(
`{"log":"line%d\n","stream":"stdout","time":"%s"}`+"\n", line, now))
} else {
file.WriteString(fmt.Sprintf(
`{"log":"line%d\n","stream":"stderr","time":"%s"}`+"\n", line, now))
}
time.Sleep(1 * time.Millisecond)
if line == 5 {
file.Close()
// Pretend to rotate the log.
rotatedName := fmt.Sprintf("%s.%s", baseName, time.Now().Format("220060102-150405"))
rotatedName = filepath.Join(dir, rotatedName)
if err := os.Rename(filepath.Join(dir, baseName), rotatedName); err != nil {
assert.NoErrorf(t, err, "failed to rotate log %q to %q", file.Name(), rotatedName)
return
}
newF := filepath.Join(dir, baseName)
if file, err = os.Create(newF); err != nil {
assert.NoError(t, err, "unable to create new log file")
return
}
time.Sleep(20 * time.Millisecond)
}
}
time.Sleep(20 * time.Millisecond)
// Make the function ReadLogs end.
fakeRuntimeService.Lock()
fakeRuntimeService.Containers[containerID].State = runtimeapi.ContainerState_CONTAINER_EXITED
fakeRuntimeService.Unlock()
assert.Equal(t, expectedStdout, stdoutBuf.String())
assert.Equal(t, expectedStderr, stderrBuf.String())
}
func TestParseLog(t *testing.T) {
timestamp, err := time.Parse(timeFormatIn, "2016-10-20T18:39:20.57606443Z")
assert.NoError(t, err)