diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 5c519354e58..5fab0ffd6dc 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -357,10 +357,10 @@ func getTerminationMessage(status *runtimeapi.ContainerStatus, terminationMessag // readLastStringFromContainerLogs attempts to read up to the max log length from the end of the CRI log represented // by path. It reads up to max log lines. -func readLastStringFromContainerLogs(path string) string { +func (m *kubeGenericRuntimeManager) readLastStringFromContainerLogs(path string) string { value := int64(kubecontainer.MaxContainerTerminationMessageLogLines) buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength) - if err := ReadLogs(path, &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil { + if err := m.ReadLogs(path, "", &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil { return fmt.Sprintf("Error on reading termination message from logs: %v", err) } return buf.String() @@ -414,7 +414,7 @@ func (m *kubeGenericRuntimeManager) getPodContainerStatuses(uid kubetypes.UID, n tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs) if checkLogs { path := buildFullContainerLogsPath(uid, labeledInfo.ContainerName, annotatedInfo.RestartCount) - tMessage = readLastStringFromContainerLogs(path) + tMessage = m.readLastStringFromContainerLogs(path) } // Use the termination message written by the application is not empty if len(tMessage) != 0 { @@ -688,7 +688,7 @@ func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *v1.Pod, containerID ku labeledInfo := getContainerInfoFromLabels(status.Labels) annotatedInfo := getContainerInfoFromAnnotations(status.Annotations) path := buildFullContainerLogsPath(pod.UID, labeledInfo.ContainerName, annotatedInfo.RestartCount) - return ReadLogs(path, logOptions, stdout, stderr) + return m.ReadLogs(path, containerID.ID, logOptions, stdout, stderr) } // GetExec gets the endpoint the runtime will serve the exec request from. diff --git a/pkg/kubelet/kuberuntime/kuberuntime_logs.go b/pkg/kubelet/kuberuntime/kuberuntime_logs.go index 25a507f368c..386759282f0 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_logs.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_logs.go @@ -32,6 +32,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api/v1" + runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" "k8s.io/kubernetes/pkg/util/tail" ) @@ -54,6 +55,11 @@ const ( timeFormat = time.RFC3339Nano // blockSize is the block size used in tail. blockSize = 1024 + + // stateCheckPeriod is the period to check container state while following + // the container log. Kubelet should not keep following the log when the + // container is not running. + stateCheckPeriod = 5 * time.Second ) var ( @@ -110,7 +116,9 @@ func newLogOptions(apiOpts *v1.PodLogOptions, now time.Time) *logOptions { } // ReadLogs read the container log and redirect into stdout and stderr. -func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error { +// Note that containerID is only needed when following the log, or else +// just pass in empty string "". +func (m *kubeGenericRuntimeManager) ReadLogs(path, containerID string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error { f, err := os.Open(path) if err != nil { return fmt.Errorf("failed to open log file %q: %v", path, err) @@ -166,8 +174,8 @@ func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) } } // Wait until the next log change. - if err := waitLogs(watcher); err != nil { - return fmt.Errorf("failed to wait logs for log file %q: %v", path, err) + if found, err := m.waitLogs(containerID, watcher); !found { + return err } continue } @@ -196,6 +204,41 @@ func ReadLogs(path string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) } } +// waitLogs wait for the next log write. It returns a boolean and an error. The boolean +// indicates whether a new log is found; the error is error happens during waiting new logs. +func (m *kubeGenericRuntimeManager) waitLogs(id string, w *fsnotify.Watcher) (bool, error) { + errRetry := 5 + for { + select { + case e := <-w.Events: + switch e.Op { + case fsnotify.Write: + return true, nil + default: + glog.Errorf("Unexpected fsnotify event: %v, retrying...", e) + } + case err := <-w.Errors: + glog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry) + if errRetry == 0 { + return false, err + } + errRetry-- + case <-time.After(stateCheckPeriod): + s, err := m.runtimeService.ContainerStatus(id) + if err != nil { + return false, err + } + // Only keep following container log when it is running. + if s.State != runtimeapi.ContainerState_CONTAINER_RUNNING { + glog.Errorf("Container %q is not running (state=%q)", id, s.State) + // Do not return error because it's normal that the container stops + // during waiting. + return false, nil + } + } + } +} + // parseFunc is a function parsing one log line to the internal log type. // Notice that the caller must make sure logMessage is not nil. type parseFunc func([]byte, *logMessage) error @@ -267,28 +310,6 @@ func getParseFunc(log []byte) (parseFunc, error) { return nil, fmt.Errorf("unsupported log format: %q", log) } -// waitLogs wait for the next log write. -func waitLogs(w *fsnotify.Watcher) error { - errRetry := 5 - for { - select { - case e := <-w.Events: - switch e.Op { - case fsnotify.Write: - return nil - default: - glog.Errorf("Unexpected fsnotify event: %v, retrying...", e) - } - case err := <-w.Errors: - glog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry) - if errRetry == 0 { - return err - } - errRetry-- - } - } -} - // logWriter controls the writing into the stream based on the log options. type logWriter struct { stdout io.Writer