diff --git a/pkg/kubelet/rkt/log.go b/pkg/kubelet/rkt/log.go index b4bffa0b46f..4213f591fa8 100644 --- a/pkg/kubelet/rkt/log.go +++ b/pkg/kubelet/rkt/log.go @@ -17,121 +17,70 @@ limitations under the License. package rkt import ( - "bufio" - "bytes" - "encoding/json" "fmt" "io" - "os/exec" - "strconv" "strings" - "sync" "time" "github.com/golang/glog" + "golang.org/x/net/context" + rktapi "github.com/coreos/rkt/api/v1alpha" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/util/format" ) const ( - // The layout of the time format that satisfies the `--since` option for journalctl. - // See man journalctl for more details. - journalSinceLayout = "2006-01-02 15:04:05" + journalTimestampLayout = "2006-01-02 15:04:05 -0700 MST" ) -var journalNoEntriesLine = []byte(`"-- No entries --"`) +// processLines write the lines into stdout in the required format. +func processLines(lines []string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) { + msgKey := "MESSAGE=" -// pipeLog reads and parses the journal json object from r, -// and writes the logs line by line to w. -func pipeLog(wg *sync.WaitGroup, logOptions *api.PodLogOptions, r io.ReadCloser, w io.Writer) { - defer func() { - r.Close() - wg.Done() - }() - - scanner := bufio.NewScanner(r) - for scanner.Scan() { - var data interface{} - b := scanner.Bytes() - if bytes.Equal(b, journalNoEntriesLine) { + for _, line := range lines { + msgStart := strings.Index(line, msgKey) + if msgStart < 0 { + glog.Warningf("rkt: Invalid log line %q, missing %q", line, msgKey) continue } - if err := json.Unmarshal(b, &data); err != nil { - glog.Warningf("rkt: Cannot unmarshal journal log %q, skipping line: %v", string(b), err) - continue - } + tss := strings.TrimSpace(line[:msgStart]) + msg := strings.TrimPrefix(line[msgStart:], msgKey) - // Decode the json object as a map so we don't have to create a struct for it. - m := data.(map[string]interface{}) - msg := m["MESSAGE"].(string) - tss := m["__REALTIME_TIMESTAMP"].(string) - - ts, err := strconv.ParseInt(tss, 0, 64) + t, err := time.Parse(journalTimestampLayout, tss) if err != nil { - glog.Warningf("rkt: Cannot parse timestamp of journal log, skipping line: %v", err) + glog.Warningf("rkt: Failed to parse the timestamp %q: %v", tss, err) continue } - // '_REALTIME_TIMESTAMP' is the microseconds since epoch. - // http://www.freedesktop.org/software/systemd/man/sd_journal_get_realtime_usec.html#Description - micros := time.Duration(ts) * time.Microsecond - t := time.Unix(int64(micros.Seconds()), 0) - var result string if logOptions.Timestamps { - // Use the same time format as docker. result = fmt.Sprintf("%s %s\n", t.Format(time.RFC3339), msg) } else { result = fmt.Sprintf("%s\n", msg) } - // When user interrupts the 'kubectl logs $POD -f' with 'Ctrl-C', this write will fail. - // We need to close the reader to force the journalctl process to exit, also we need to - // return here to avoid goroutine leak. - if _, err := io.WriteString(w, result); err != nil { - glog.Warningf("rkt: Cannot write log to output: %v, data: %s", err, result) - return + if _, err := io.WriteString(stdout, result); err != nil { + glog.Warningf("rkt: Cannot write log line %q to output: %v", result, err) } } - - if err := scanner.Err(); err != nil { - glog.Warningf("rkt: Cannot read journal logs", err) - } } -// GetContainerLogs uses journalctl to get the logs of the container. +// GetContainerLogs uses rkt's GetLogs API to get the logs of the container. // By default, it returns a snapshot of the container log. Set |follow| to true to // stream the log. Set |follow| to false and specify the number of lines (e.g. // "100" or "all") to tail the log. // -// In rkt runtime's implementation, per container log is get via 'journalctl -m _MACHINE_ID=[MACHINE_ID] -u [APP_NAME]'. -// Where MACHINE_ID is the rkt id without dash. -// See https://github.com/coreos/rkt/blob/master/Documentation/commands.md#logging for more details. -// -// TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail. +// TODO(yifan): This doesn't work with lkvm stage1 yet. func (r *Runtime) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error { id, err := parseContainerID(containerID) if err != nil { return err } - // Note: this only works for rkt versions after 1.6.0 - // See https://github.com/coreos/rkt/issues/2630 - cmd := exec.Command("journalctl", "-m", fmt.Sprintf("_MACHINE_ID=%s", strings.Replace(id.uuid, "-", "", -1)), "-t", id.appName, "-a") - // Get the json structured logs. - cmd.Args = append(cmd.Args, "-o", "json") - - if logOptions.Follow { - cmd.Args = append(cmd.Args, "-f") - } - - if logOptions.TailLines != nil { - cmd.Args = append(cmd.Args, "-n", strconv.FormatInt(*logOptions.TailLines, 10)) - } - var since int64 if logOptions.SinceSeconds != nil { t := unversioned.Now().Add(-time.Duration(*logOptions.SinceSeconds) * time.Second) @@ -141,41 +90,34 @@ func (r *Runtime) GetContainerLogs(pod *api.Pod, containerID kubecontainer.Conta since = logOptions.SinceTime.Unix() } - if since > 0 { - // Need to add '-r' flag if we include '--since' and '-n' at the both time, - // see https://github.com/systemd/systemd/issues/1477 - cmd.Args = append(cmd.Args, "--since", time.Unix(since, 0).Format(journalSinceLayout)) - if logOptions.TailLines != nil { - cmd.Args = append(cmd.Args, "-r") + getLogsRequest := &rktapi.GetLogsRequest{ + PodId: id.uuid, + AppName: id.appName, + Follow: logOptions.Follow, + SinceTime: since, + } + + if logOptions.TailLines != nil { + getLogsRequest.Lines = int32(*logOptions.TailLines) + } + + stream, err := r.apisvc.GetLogs(context.Background(), getLogsRequest) + if err != nil { + glog.Errorf("rkt: Failed to create log stream for pod %q: %v", format.Pod(pod), err) + return err + } + + for { + log, err := stream.Recv() + if err == io.EOF { + break } + if err != nil { + glog.Errorf("rkt: Failed to receive log for pod %q: %v", format.Pod(pod), err) + return err + } + processLines(log.Lines, logOptions, stdout, stderr) } - glog.V(4).Infof("rkt: gettings logs with command %q", cmd.Args) - outPipe, err := cmd.StdoutPipe() - if err != nil { - glog.Errorf("rkt: cannot create pipe for journalctl's stdout: %v", err) - return err - } - errPipe, err := cmd.StderrPipe() - if err != nil { - glog.Errorf("rkt: cannot create pipe for journalctl's stderr: %v", err) - return err - } - - if err := cmd.Start(); err != nil { - return err - } - - var wg sync.WaitGroup - - wg.Add(2) - - go pipeLog(&wg, logOptions, outPipe, stdout) - go pipeLog(&wg, logOptions, errPipe, stderr) - - // Wait until the logs are fed to stdout, stderr. - wg.Wait() - cmd.Wait() - return nil } diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 7d8ae3fc6af..47a16dc455e 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -67,8 +67,8 @@ const ( RktType = "rkt" DefaultRktAPIServiceEndpoint = "localhost:15441" - minimumRktBinVersion = "1.7.0" - recommendedRktBinVersion = "1.7.0" + minimumRktBinVersion = "1.8.0" + recommendedRktBinVersion = "1.8.0" minimumRktApiVersion = "1.0.0-alpha" minimumSystemdVersion = "219"