Merge pull request #44406 from Random-Liu/stop-following-when-exited

Automatic merge from submit-queue

CRI: Stop following container log when container exited.

Fixes https://github.com/kubernetes/kubernetes/issues/44340.

This PR changed kubelet to periodically check whether container is running when following container logs, and stop following when container exited.

I've tried this PR in my local cluster:
```
Wed Apr 12 20:23:54 UTC 2017
Wed Apr 12 20:23:58 UTC 2017
Wed Apr 12 20:24:02 UTC 2017
Wed Apr 12 20:24:06 UTC 2017
Wed Apr 12 20:24:10 UTC 2017
Wed Apr 12 20:24:14 UTC 2017
Wed Apr 12 20:24:18 UTC 2017
Wed Apr 12 20:24:22 UTC 2017
Wed Apr 12 20:24:26 UTC 2017
Wed Apr 12 20:24:30 UTC 2017
Wed Apr 12 20:24:34 UTC 2017
Wed Apr 12 20:24:38 UTC 2017
Wed Apr 12 20:24:42 UTC 2017
Wed Apr 12 20:24:46 UTC 2017
failed to wait logs for log file "/var/log/pods/1d54634c7b31346fc3219f5e0b7507cc/nginx_0.log": container "b9a17a2c53550c3703ab350d85911743af8bf164a41813544fd08fb9585f7501" is not running (state="CONTAINER_EXITED")
```

The only difference is that `ReadLogs` will return error when container exits during following. I'm not sure whether we should get rid of it or not.

@yujuhong @feiskyer @JorritSalverda
/cc @kubernetes/sig-node-bugs 

**Release note**:
```release-note
`kubectl logs -f` now stops following when container stops.
```
This commit is contained in:
Kubernetes Submit Queue 2017-04-13 19:10:28 -07:00 committed by GitHub
commit 1cf6ef08df
2 changed files with 50 additions and 29 deletions

View File

@ -357,10 +357,10 @@ func getTerminationMessage(status *runtimeapi.ContainerStatus, terminationMessag
// readLastStringFromContainerLogs attempts to read up to the max log length from the end of the CRI log represented
// by path. It reads up to max log lines.
func readLastStringFromContainerLogs(path string) string {
func (m *kubeGenericRuntimeManager) readLastStringFromContainerLogs(path string) string {
value := int64(kubecontainer.MaxContainerTerminationMessageLogLines)
buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength)
if err := ReadLogs(path, &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil {
if err := m.ReadLogs(path, "", &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil {
return fmt.Sprintf("Error on reading termination message from logs: %v", err)
}
return buf.String()
@ -414,7 +414,7 @@ func (m *kubeGenericRuntimeManager) getPodContainerStatuses(uid kubetypes.UID, n
tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs)
if checkLogs {
path := buildFullContainerLogsPath(uid, labeledInfo.ContainerName, annotatedInfo.RestartCount)
tMessage = readLastStringFromContainerLogs(path)
tMessage = m.readLastStringFromContainerLogs(path)
}
// Use the termination message written by the application is not empty
if len(tMessage) != 0 {
@ -688,7 +688,7 @@ func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *v1.Pod, containerID ku
labeledInfo := getContainerInfoFromLabels(status.Labels)
annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)
path := buildFullContainerLogsPath(pod.UID, labeledInfo.ContainerName, annotatedInfo.RestartCount)
return ReadLogs(path, logOptions, stdout, stderr)
return m.ReadLogs(path, containerID.ID, logOptions, stdout, stderr)
}
// GetExec gets the endpoint the runtime will serve the exec request from.

View File

@ -32,6 +32,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/v1"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/util/tail"
)
@ -54,6 +55,11 @@ const (
timeFormat = time.RFC3339Nano
// blockSize is the block size used in tail.
blockSize = 1024
// stateCheckPeriod is the period to check container state while following
// the container log. Kubelet should not keep following the log when the
// container is not running.
stateCheckPeriod = 5 * time.Second
)
var (
@ -110,7 +116,9 @@ func newLogOptions(apiOpts *v1.PodLogOptions, now time.Time) *logOptions {
}
// ReadLogs read the container log and redirect into stdout and stderr.
func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error {
// Note that containerID is only needed when following the log, or else
// just pass in empty string "".
func (m *kubeGenericRuntimeManager) ReadLogs(path, containerID string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("failed to open log file %q: %v", path, err)
@ -166,8 +174,8 @@ func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer)
}
}
// Wait until the next log change.
if err := waitLogs(watcher); err != nil {
return fmt.Errorf("failed to wait logs for log file %q: %v", path, err)
if found, err := m.waitLogs(containerID, watcher); !found {
return err
}
continue
}
@ -196,6 +204,41 @@ func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer)
}
}
// waitLogs wait for the next log write. It returns a boolean and an error. The boolean
// indicates whether a new log is found; the error is error happens during waiting new logs.
func (m *kubeGenericRuntimeManager) waitLogs(id string, w *fsnotify.Watcher) (bool, error) {
errRetry := 5
for {
select {
case e := <-w.Events:
switch e.Op {
case fsnotify.Write:
return true, nil
default:
glog.Errorf("Unexpected fsnotify event: %v, retrying...", e)
}
case err := <-w.Errors:
glog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry)
if errRetry == 0 {
return false, err
}
errRetry--
case <-time.After(stateCheckPeriod):
s, err := m.runtimeService.ContainerStatus(id)
if err != nil {
return false, err
}
// Only keep following container log when it is running.
if s.State != runtimeapi.ContainerState_CONTAINER_RUNNING {
glog.Errorf("Container %q is not running (state=%q)", id, s.State)
// Do not return error because it's normal that the container stops
// during waiting.
return false, nil
}
}
}
}
// parseFunc is a function parsing one log line to the internal log type.
// Notice that the caller must make sure logMessage is not nil.
type parseFunc func([]byte, *logMessage) error
@ -267,28 +310,6 @@ func getParseFunc(log []byte) (parseFunc, error) {
return nil, fmt.Errorf("unsupported log format: %q", log)
}
// waitLogs wait for the next log write.
func waitLogs(w *fsnotify.Watcher) error {
errRetry := 5
for {
select {
case e := <-w.Events:
switch e.Op {
case fsnotify.Write:
return nil
default:
glog.Errorf("Unexpected fsnotify event: %v, retrying...", e)
}
case err := <-w.Errors:
glog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry)
if errRetry == 0 {
return err
}
errRetry--
}
}
}
// logWriter controls the writing into the stream based on the log options.
type logWriter struct {
stdout io.Writer