diff --git a/pkg/kubelet/kuberuntime/logs/logs.go b/pkg/kubelet/kuberuntime/logs/logs.go index a2dedb08f80..b22734d792c 100644 --- a/pkg/kubelet/kuberuntime/logs/logs.go +++ b/pkg/kubelet/kuberuntime/logs/logs.go @@ -318,6 +318,8 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r found := true writer := newLogWriter(stdout, stderr, opts) msg := &logMessage{} + baseName := filepath.Base(path) + dir := filepath.Dir(path) for { if stop || (limitedMode && limitedNum == 0) { klog.V(2).InfoS("Finished parsing log file", "path", path) @@ -344,8 +346,8 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r return fmt.Errorf("failed to create fsnotify watcher: %v", err) } defer watcher.Close() - if err := watcher.Add(f.Name()); err != nil { - return fmt.Errorf("failed to watch file %q: %v", f.Name(), err) + if err := watcher.Add(dir); err != nil { + return fmt.Errorf("failed to watch directory %q: %w", dir, err) } // If we just created the watcher, try again to read as we might have missed // the event. @@ -353,7 +355,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r } var recreated bool // Wait until the next log change. - found, recreated, err = waitLogs(ctx, containerID, watcher, runtimeService) + found, recreated, err = waitLogs(ctx, containerID, baseName, watcher, runtimeService) if err != nil { return err } @@ -367,13 +369,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r } defer newF.Close() f.Close() - if err := watcher.Remove(f.Name()); err != nil && !os.IsNotExist(err) { - klog.ErrorS(err, "Failed to remove file watch", "path", f.Name()) - } f = newF - if err := watcher.Add(f.Name()); err != nil { - return fmt.Errorf("failed to watch file %q: %v", f.Name(), err) - } r = bufio.NewReader(f) } // If the container exited consume data until the next EOF @@ -441,7 +437,7 @@ func isContainerRunning(ctx context.Context, id string, r internalapi.RuntimeSer // waitLogs wait for the next log write. It returns two booleans and an error. The first boolean // indicates whether a new log is found; the second boolean if the log file was recreated; // the error is error happens during waiting new logs. -func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, bool, error) { +func waitLogs(ctx context.Context, id string, logName string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, bool, error) { // no need to wait if the pod is not running if running, err := isContainerRunning(ctx, id, runtimeService); !running { return false, false, err @@ -453,16 +449,10 @@ func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeServic return false, false, fmt.Errorf("context cancelled") case e := <-w.Events: switch e.Op { - case fsnotify.Write: + case fsnotify.Write, fsnotify.Rename, fsnotify.Remove, fsnotify.Chmod: return true, false, nil case fsnotify.Create: - fallthrough - case fsnotify.Rename: - fallthrough - case fsnotify.Remove: - fallthrough - case fsnotify.Chmod: - return true, true, nil + return true, filepath.Base(e.Name) == logName, nil default: klog.ErrorS(nil, "Received unexpected fsnotify event, retrying", "event", e) } diff --git a/pkg/kubelet/kuberuntime/logs/logs_test.go b/pkg/kubelet/kuberuntime/logs/logs_test.go index f2c17cb8161..7b5e2b3a585 100644 --- a/pkg/kubelet/kuberuntime/logs/logs_test.go +++ b/pkg/kubelet/kuberuntime/logs/logs_test.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "os" + "path/filepath" "testing" "time" @@ -30,6 +31,7 @@ import ( v1 "k8s.io/api/core/v1" apitesting "k8s.io/cri-api/pkg/apis/testing" + "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/utils/pointer" "github.com/stretchr/testify/assert" @@ -211,6 +213,88 @@ func TestReadLogs(t *testing.T) { } } +func TestReadRotatedLog(t *testing.T) { + tmpDir := t.TempDir() + file, err := os.CreateTemp(tmpDir, "logfile") + if err != nil { + assert.NoErrorf(t, err, "unable to create temp file") + } + + stdoutBuf := &bytes.Buffer{} + stderrBuf := &bytes.Buffer{} + containerID := "fake-container-id" + fakeRuntimeService := &apitesting.FakeRuntimeService{ + Containers: map[string]*apitesting.FakeContainer{ + containerID: { + ContainerStatus: runtimeapi.ContainerStatus{ + State: runtimeapi.ContainerState_CONTAINER_RUNNING, + }, + }, + }, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Start to follow the container's log. + go func(ctx context.Context) { + podLogOptions := v1.PodLogOptions{ + Follow: true, + } + opts := NewLogOptions(&podLogOptions, time.Now()) + ReadLogs(ctx, file.Name(), containerID, opts, fakeRuntimeService, stdoutBuf, stderrBuf) + }(ctx) + + // log in stdout + expectedStdout := "line0\nline2\nline4\nline6\nline8\n" + // log in stderr + expectedStderr := "line1\nline3\nline5\nline7\nline9\n" + + dir := filepath.Dir(file.Name()) + baseName := filepath.Base(file.Name()) + + // Write 10 lines to log file. + // Let ReadLogs start. + time.Sleep(50 * time.Millisecond) + for line := 0; line < 10; line++ { + // Write the first three lines to log file + now := time.Now().Format(types.RFC3339NanoLenient) + if line%2 == 0 { + file.WriteString(fmt.Sprintf( + `{"log":"line%d\n","stream":"stdout","time":"%s"}`+"\n", line, now)) + } else { + file.WriteString(fmt.Sprintf( + `{"log":"line%d\n","stream":"stderr","time":"%s"}`+"\n", line, now)) + } + time.Sleep(1 * time.Millisecond) + + if line == 5 { + file.Close() + // Pretend to rotate the log. + rotatedName := fmt.Sprintf("%s.%s", baseName, time.Now().Format("220060102-150405")) + rotatedName = filepath.Join(dir, rotatedName) + if err := os.Rename(filepath.Join(dir, baseName), rotatedName); err != nil { + assert.NoErrorf(t, err, "failed to rotate log %q to %q", file.Name(), rotatedName) + return + } + + newF := filepath.Join(dir, baseName) + if file, err = os.Create(newF); err != nil { + assert.NoError(t, err, "unable to create new log file") + return + } + time.Sleep(20 * time.Millisecond) + } + } + + time.Sleep(20 * time.Millisecond) + // Make the function ReadLogs end. + fakeRuntimeService.Lock() + fakeRuntimeService.Containers[containerID].State = runtimeapi.ContainerState_CONTAINER_EXITED + fakeRuntimeService.Unlock() + + assert.Equal(t, expectedStdout, stdoutBuf.String()) + assert.Equal(t, expectedStderr, stderrBuf.String()) +} + func TestParseLog(t *testing.T) { timestamp, err := time.Parse(timeFormatIn, "2016-10-20T18:39:20.57606443Z") assert.NoError(t, err)