From 2c30eee92f8a1b461848cd0ad2b2848d908a69de Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Tue, 15 Jan 2019 19:14:36 +0100 Subject: [PATCH 1/5] kubelet: read immediately after creating the watcher if some events happen between the Read and while we set the watcher, we might miss them. Signed-off-by: Giuseppe Scrivano --- pkg/kubelet/kuberuntime/logs/logs.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/kubelet/kuberuntime/logs/logs.go b/pkg/kubelet/kuberuntime/logs/logs.go index d37ed2a9877..db88834001b 100644 --- a/pkg/kubelet/kuberuntime/logs/logs.go +++ b/pkg/kubelet/kuberuntime/logs/logs.go @@ -316,6 +316,9 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r if err := watcher.Add(f.Name()); err != nil { return fmt.Errorf("failed to watch file %q: %v", f.Name(), err) } + // If we just created the watcher, try again to read as we might have missed + // the event. + continue } // Wait until the next log change. if found, err := waitLogs(ctx, containerID, watcher, runtimeService); !found { From 341c2c0d1fe13d36c44e41e29c21f46daa0073b8 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Tue, 15 Jan 2019 19:17:43 +0100 Subject: [PATCH 2/5] kubelet: handle recreated log files if the runtime is configured to rotate the log file, we might end up watching the old fd where there are no more writes. When a fsnotify event other than Write is received, reopen the log file and recreate the watcher. Signed-off-by: Giuseppe Scrivano --- pkg/kubelet/kuberuntime/logs/logs.go | 46 ++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/pkg/kubelet/kuberuntime/logs/logs.go b/pkg/kubelet/kuberuntime/logs/logs.go index db88834001b..b6e41ec4ad4 100644 --- a/pkg/kubelet/kuberuntime/logs/logs.go +++ b/pkg/kubelet/kuberuntime/logs/logs.go @@ -321,7 +321,26 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r continue } // Wait until the next log change. - if found, err := waitLogs(ctx, containerID, watcher, runtimeService); !found { + found, recreated, err := waitLogs(ctx, containerID, watcher, runtimeService) + if recreated { + newF, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + continue + } + return fmt.Errorf("failed to open log file %q: %v", path, err) + } + f.Close() + if err := watcher.Remove(f.Name()); err != nil && !os.IsNotExist(err) { + klog.Errorf("failed to remove file watch %q: %v", f.Name(), err) + } + f = newF + if err := watcher.Add(f.Name()); err != nil { + return fmt.Errorf("failed to watch file %q: %v", f.Name(), err) + } + r = bufio.NewReader(f) + } + if !found { return err } continue @@ -373,34 +392,43 @@ func isContainerRunning(id string, r internalapi.RuntimeService) (bool, error) { return true, nil } -// waitLogs wait for the next log write. It returns a boolean and an error. The boolean -// indicates whether a new log is found; the error is error happens during waiting new logs. -func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, error) { +// waitLogs wait for the next log write. It returns two booleans and an error. The first boolean +// indicates whether a new log is found; the second boolean if the log file was recreated; +// the error is error happens during waiting new logs. +func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, bool, error) { // no need to wait if the pod is not running if running, err := isContainerRunning(id, runtimeService); !running { - return false, err + return false, false, err } errRetry := 5 for { select { case <-ctx.Done(): - return false, fmt.Errorf("context cancelled") + return false, false, fmt.Errorf("context cancelled") case e := <-w.Events: switch e.Op { case fsnotify.Write: - return true, nil + return true, false, nil + case fsnotify.Create: + fallthrough + case fsnotify.Rename: + fallthrough + case fsnotify.Remove: + fallthrough + case fsnotify.Chmod: + return true, true, nil default: klog.Errorf("Unexpected fsnotify event: %v, retrying...", e) } case err := <-w.Errors: klog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry) if errRetry == 0 { - return false, err + return false, false, err } errRetry-- case <-time.After(stateCheckPeriod): if running, err := isContainerRunning(id, runtimeService); !running { - return false, err + return false, false, err } } } From 8f68b281e4bf74ffaf5f2b0be689eab4fcd8ec32 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 16 Jan 2019 21:54:14 +0100 Subject: [PATCH 3/5] kubelet: force checking the log file every second it seems fsnotify can miss some read events, blocking the kubelet to receive more data from the log file. If we end up waiting for events with fsnotify, force a read from the log file every second so that are sure to not miss new data for longer than that. Signed-off-by: Giuseppe Scrivano --- pkg/kubelet/kuberuntime/logs/logs.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/kubelet/kuberuntime/logs/logs.go b/pkg/kubelet/kuberuntime/logs/logs.go index b6e41ec4ad4..3f92a80cb57 100644 --- a/pkg/kubelet/kuberuntime/logs/logs.go +++ b/pkg/kubelet/kuberuntime/logs/logs.go @@ -56,6 +56,9 @@ const ( // the container log. Kubelet should not keep following the log when the // container is not running. stateCheckPeriod = 5 * time.Second + + // logForceCheckPeriod is the period to check for a new read + logForceCheckPeriod = 1 * time.Second ) var ( @@ -426,6 +429,8 @@ func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeServic return false, false, err } errRetry-- + case <-time.After(logForceCheckPeriod): + return true, false, nil case <-time.After(stateCheckPeriod): if running, err := isContainerRunning(id, runtimeService); !running { return false, false, err From a561196bfed659eed6f2b49d212985e594c92155 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 21 Jan 2019 22:19:21 +0100 Subject: [PATCH 4/5] logs: consume all file until EOF on exited container If the container is not found, do not stop reading the log file immediately but wait until we reach again EOF. Signed-off-by: Giuseppe Scrivano --- pkg/kubelet/kuberuntime/logs/logs.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/kubelet/kuberuntime/logs/logs.go b/pkg/kubelet/kuberuntime/logs/logs.go index 3f92a80cb57..86133063df6 100644 --- a/pkg/kubelet/kuberuntime/logs/logs.go +++ b/pkg/kubelet/kuberuntime/logs/logs.go @@ -292,6 +292,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r var watcher *fsnotify.Watcher var parse parseFunc var stop bool + found := true writer := newLogWriter(stdout, stderr, opts) msg := &logMessage{} for { @@ -305,6 +306,10 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r return fmt.Errorf("failed to read log file %q: %v", path, err) } if opts.follow { + // The container is not running, we got to the end of the log. + if !found { + return nil + } // Reset seek so that if this is an incomplete line, // it will be read again. if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil { @@ -323,8 +328,12 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r // the event. continue } + var recreated bool // Wait until the next log change. - found, recreated, err := waitLogs(ctx, containerID, watcher, runtimeService) + found, recreated, err = waitLogs(ctx, containerID, watcher, runtimeService) + if err != nil { + return err + } if recreated { newF, err := os.Open(path) if err != nil { @@ -343,9 +352,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r } r = bufio.NewReader(f) } - if !found { - return err - } + // If the container exited consume data until the next EOF continue } // Should stop after writing the remaining content. From 31397083b2861a2a037f947184cd1f666622b5be Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Sat, 30 Mar 2019 18:45:14 +0100 Subject: [PATCH 5/5] test, e2e: add tests for logs -f Signed-off-by: Giuseppe Scrivano --- test/e2e/kubectl/kubectl.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/test/e2e/kubectl/kubectl.go b/test/e2e/kubectl/kubectl.go index 3f197b2b64b..56ea3c1b278 100644 --- a/test/e2e/kubectl/kubectl.go +++ b/test/e2e/kubectl/kubectl.go @@ -570,6 +570,21 @@ var _ = SIGDescribe("Kubectl client", func() { gomega.Expect(c.BatchV1().Jobs(ns).Delete("run-test-3", nil)).To(gomega.BeNil()) }) + ginkgo.It("should contain last line of the log", func() { + nsFlag := fmt.Sprintf("--namespace=%v", ns) + podName := "run-log-test" + + ginkgo.By("executing a command with run") + framework.RunKubectlOrDie("run", podName, "--generator=run-pod/v1", "--image="+busyboxImage, "--restart=OnFailure", nsFlag, "--", "sh", "-c", "sleep 10; seq 100 | while read i; do echo $i; sleep 0.01; done; echo EOF") + + if !framework.CheckPodsRunningReady(c, ns, []string{podName}, framework.PodStartTimeout) { + framework.Failf("Pod for run-log-test was not ready") + } + + logOutput := framework.RunKubectlOrDie(nsFlag, "logs", "-f", "run-log-test") + gomega.Expect(logOutput).To(gomega.ContainSubstring("EOF")) + }) + ginkgo.It("should support port-forward", func() { ginkgo.By("forwarding the container port to a local port") cmd := runPortForward(ns, simplePodName, simplePodPort)