Merge pull request #73041 from giuseppe/fix-logs

kubelet: fix some race conditions with logs -f
This commit is contained in:
Kubernetes Prow Robot 2019-04-01 10:02:34 -07:00 committed by GitHub
commit 46ae77421c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 67 additions and 9 deletions

View File

@ -56,6 +56,9 @@ const (
// the container log. Kubelet should not keep following the log when the
// container is not running.
stateCheckPeriod = 5 * time.Second
// logForceCheckPeriod is the period to check for a new read
logForceCheckPeriod = 1 * time.Second
)
var (
@ -289,6 +292,7 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
var watcher *fsnotify.Watcher
var parse parseFunc
var stop bool
found := true
writer := newLogWriter(stdout, stderr, opts)
msg := &logMessage{}
for {
@ -302,6 +306,10 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
return fmt.Errorf("failed to read log file %q: %v", path, err)
}
if opts.follow {
// The container is not running, we got to the end of the log.
if !found {
return nil
}
// Reset seek so that if this is an incomplete line,
// it will be read again.
if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil {
@ -316,11 +324,35 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
if err := watcher.Add(f.Name()); err != nil {
return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
}
// If we just created the watcher, try again to read as we might have missed
// the event.
continue
}
var recreated bool
// Wait until the next log change.
if found, err := waitLogs(ctx, containerID, watcher, runtimeService); !found {
found, recreated, err = waitLogs(ctx, containerID, watcher, runtimeService)
if err != nil {
return err
}
if recreated {
newF, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
continue
}
return fmt.Errorf("failed to open log file %q: %v", path, err)
}
f.Close()
if err := watcher.Remove(f.Name()); err != nil && !os.IsNotExist(err) {
klog.Errorf("failed to remove file watch %q: %v", f.Name(), err)
}
f = newF
if err := watcher.Add(f.Name()); err != nil {
return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
}
r = bufio.NewReader(f)
}
// If the container exited consume data until the next EOF
continue
}
// Should stop after writing the remaining content.
@ -370,34 +402,45 @@ func isContainerRunning(id string, r internalapi.RuntimeService) (bool, error) {
return true, nil
}
// waitLogs wait for the next log write. It returns a boolean and an error. The boolean
// indicates whether a new log is found; the error is error happens during waiting new logs.
func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, error) {
// waitLogs wait for the next log write. It returns two booleans and an error. The first boolean
// indicates whether a new log is found; the second boolean if the log file was recreated;
// the error is error happens during waiting new logs.
func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, bool, error) {
// no need to wait if the pod is not running
if running, err := isContainerRunning(id, runtimeService); !running {
return false, err
return false, false, err
}
errRetry := 5
for {
select {
case <-ctx.Done():
return false, fmt.Errorf("context cancelled")
return false, false, fmt.Errorf("context cancelled")
case e := <-w.Events:
switch e.Op {
case fsnotify.Write:
return true, nil
return true, false, nil
case fsnotify.Create:
fallthrough
case fsnotify.Rename:
fallthrough
case fsnotify.Remove:
fallthrough
case fsnotify.Chmod:
return true, true, nil
default:
klog.Errorf("Unexpected fsnotify event: %v, retrying...", e)
}
case err := <-w.Errors:
klog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry)
if errRetry == 0 {
return false, err
return false, false, err
}
errRetry--
case <-time.After(logForceCheckPeriod):
return true, false, nil
case <-time.After(stateCheckPeriod):
if running, err := isContainerRunning(id, runtimeService); !running {
return false, err
return false, false, err
}
}
}

View File

@ -570,6 +570,21 @@ var _ = SIGDescribe("Kubectl client", func() {
gomega.Expect(c.BatchV1().Jobs(ns).Delete("run-test-3", nil)).To(gomega.BeNil())
})
ginkgo.It("should contain last line of the log", func() {
nsFlag := fmt.Sprintf("--namespace=%v", ns)
podName := "run-log-test"
ginkgo.By("executing a command with run")
framework.RunKubectlOrDie("run", podName, "--generator=run-pod/v1", "--image="+busyboxImage, "--restart=OnFailure", nsFlag, "--", "sh", "-c", "sleep 10; seq 100 | while read i; do echo $i; sleep 0.01; done; echo EOF")
if !framework.CheckPodsRunningReady(c, ns, []string{podName}, framework.PodStartTimeout) {
framework.Failf("Pod for run-log-test was not ready")
}
logOutput := framework.RunKubectlOrDie(nsFlag, "logs", "-f", "run-log-test")
gomega.Expect(logOutput).To(gomega.ContainSubstring("EOF"))
})
ginkgo.It("should support port-forward", func() {
ginkgo.By("forwarding the container port to a local port")
cmd := runPortForward(ns, simplePodName, simplePodPort)