diff --git a/pkg/kubelet/kuberuntime/logs/logs.go b/pkg/kubelet/kuberuntime/logs/logs.go index d37ed2a9877..86133063df6 100644 --- a/pkg/kubelet/kuberuntime/logs/logs.go +++ b/pkg/kubelet/kuberuntime/logs/logs.go @@ -56,6 +56,9 @@ const ( // the container log. Kubelet should not keep following the log when the // container is not running. stateCheckPeriod = 5 * time.Second + + // logForceCheckPeriod is the period to check for a new read + logForceCheckPeriod = 1 * time.Second ) var ( @@ -289,6 +292,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r var watcher *fsnotify.Watcher var parse parseFunc var stop bool + found := true writer := newLogWriter(stdout, stderr, opts) msg := &logMessage{} for { @@ -302,6 +306,10 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r return fmt.Errorf("failed to read log file %q: %v", path, err) } if opts.follow { + // The container is not running, we got to the end of the log. + if !found { + return nil + } // Reset seek so that if this is an incomplete line, // it will be read again. if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil { @@ -316,11 +324,35 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r if err := watcher.Add(f.Name()); err != nil { return fmt.Errorf("failed to watch file %q: %v", f.Name(), err) } + // If we just created the watcher, try again to read as we might have missed + // the event. + continue } + var recreated bool // Wait until the next log change. - if found, err := waitLogs(ctx, containerID, watcher, runtimeService); !found { + found, recreated, err = waitLogs(ctx, containerID, watcher, runtimeService) + if err != nil { return err } + if recreated { + newF, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + continue + } + return fmt.Errorf("failed to open log file %q: %v", path, err) + } + f.Close() + if err := watcher.Remove(f.Name()); err != nil && !os.IsNotExist(err) { + klog.Errorf("failed to remove file watch %q: %v", f.Name(), err) + } + f = newF + if err := watcher.Add(f.Name()); err != nil { + return fmt.Errorf("failed to watch file %q: %v", f.Name(), err) + } + r = bufio.NewReader(f) + } + // If the container exited consume data until the next EOF continue } // Should stop after writing the remaining content. @@ -370,34 +402,45 @@ func isContainerRunning(id string, r internalapi.RuntimeService) (bool, error) { return true, nil } -// waitLogs wait for the next log write. It returns a boolean and an error. The boolean -// indicates whether a new log is found; the error is error happens during waiting new logs. -func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, error) { +// waitLogs wait for the next log write. It returns two booleans and an error. The first boolean +// indicates whether a new log is found; the second boolean if the log file was recreated; +// the error is error happens during waiting new logs. +func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, bool, error) { // no need to wait if the pod is not running if running, err := isContainerRunning(id, runtimeService); !running { - return false, err + return false, false, err } errRetry := 5 for { select { case <-ctx.Done(): - return false, fmt.Errorf("context cancelled") + return false, false, fmt.Errorf("context cancelled") case e := <-w.Events: switch e.Op { case fsnotify.Write: - return true, nil + return true, false, nil + case fsnotify.Create: + fallthrough + case fsnotify.Rename: + fallthrough + case fsnotify.Remove: + fallthrough + case fsnotify.Chmod: + return true, true, nil default: klog.Errorf("Unexpected fsnotify event: %v, retrying...", e) } case err := <-w.Errors: klog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry) if errRetry == 0 { - return false, err + return false, false, err } errRetry-- + case <-time.After(logForceCheckPeriod): + return true, false, nil case <-time.After(stateCheckPeriod): if running, err := isContainerRunning(id, runtimeService); !running { - return false, err + return false, false, err } } } diff --git a/test/e2e/kubectl/kubectl.go b/test/e2e/kubectl/kubectl.go index 3f197b2b64b..56ea3c1b278 100644 --- a/test/e2e/kubectl/kubectl.go +++ b/test/e2e/kubectl/kubectl.go @@ -570,6 +570,21 @@ var _ = SIGDescribe("Kubectl client", func() { gomega.Expect(c.BatchV1().Jobs(ns).Delete("run-test-3", nil)).To(gomega.BeNil()) }) + ginkgo.It("should contain last line of the log", func() { + nsFlag := fmt.Sprintf("--namespace=%v", ns) + podName := "run-log-test" + + ginkgo.By("executing a command with run") + framework.RunKubectlOrDie("run", podName, "--generator=run-pod/v1", "--image="+busyboxImage, "--restart=OnFailure", nsFlag, "--", "sh", "-c", "sleep 10; seq 100 | while read i; do echo $i; sleep 0.01; done; echo EOF") + + if !framework.CheckPodsRunningReady(c, ns, []string{podName}, framework.PodStartTimeout) { + framework.Failf("Pod for run-log-test was not ready") + } + + logOutput := framework.RunKubectlOrDie(nsFlag, "logs", "-f", "run-log-test") + gomega.Expect(logOutput).To(gomega.ContainSubstring("EOF")) + }) + ginkgo.It("should support port-forward", func() { ginkgo.By("forwarding the container port to a local port") cmd := runPortForward(ns, simplePodName, simplePodPort)