From ba68b095a99170ec8119f51b9eae0a778bc6d0c9 Mon Sep 17 00:00:00 2001 From: Yifan Gu Date: Mon, 5 Oct 2015 14:59:10 -0700 Subject: [PATCH 1/2] kubelet/rkt: enable getting logs for exited pods. --- pkg/kubelet/rkt/rkt.go | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index dbc6a476916..8597dfff192 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -38,6 +38,7 @@ import ( docker "github.com/fsouza/go-dockerclient" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/credentialprovider" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -71,6 +72,10 @@ const ( dockerAuthTemplate = `{"rktKind":"dockerAuth","rktVersion":"v1","registries":[%q],"credentials":{"user":%q,"password":%q}}` defaultImageTag = "latest" + + // The layout of the time format that satisfies the `--since` option for journalctl. + // See man journalctl for more details. + journalSinceLayout = "2006-01-02 15:04:05" ) // Runtime implements the Containerruntime for rkt. The implementation @@ -1051,7 +1056,7 @@ func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus // stream the log. Set |follow| to false and specify the number of lines (e.g. // "100" or "all") to tail the log. // -// In rkt runtime's implementation, per container log is get via 'journalctl -M [rkt-$UUID] -u [APP_NAME]'. +// In rkt runtime's implementation, per container log is get via 'journalctl -m _HOSTNAME=[rkt-$UUID] -u [APP_NAME]'. // See https://github.com/coreos/rkt/blob/master/Documentation/commands.md#logging for more details. // // TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail. @@ -1061,15 +1066,33 @@ func (r *Runtime) GetContainerLogs(pod *api.Pod, containerID kubecontainer.Conta return err } - cmd := exec.Command("journalctl", "-M", fmt.Sprintf("rkt-%s", id.uuid), "-u", id.appName) + cmd := exec.Command("journalctl", "-m", fmt.Sprintf("_HOSTNAME=rkt-%s", id.uuid), "-u", id.appName, "-a") + + // If no timestamps are required, then only returns the pure logs from the app without + // any metadata. + if !logOptions.Timestamps { + cmd.Args = append(cmd.Args, "-o", "cat") + } + if logOptions.Follow { cmd.Args = append(cmd.Args, "-f") } - if logOptions.TailLines == nil { - cmd.Args = append(cmd.Args, "-a") - } else { + + if logOptions.TailLines != nil { cmd.Args = append(cmd.Args, "-n", strconv.FormatInt(*logOptions.TailLines, 10)) } + + var since int64 + if logOptions.SinceSeconds != nil { + t := unversioned.Now().Add(-time.Duration(*logOptions.SinceSeconds) * time.Second) + since = t.Unix() + } + if logOptions.SinceTime != nil { + since = logOptions.SinceTime.Unix() + } + + cmd.Args = append(cmd.Args, "--since", time.Unix(since, 0).Format(journalSinceLayout)) + cmd.Stdout, cmd.Stderr = stdout, stderr return cmd.Run() } From 0a81443056299454151779ba70b360ee818fefcf Mon Sep 17 00:00:00 2001 From: Yifan Gu Date: Tue, 6 Oct 2015 16:47:51 -0700 Subject: [PATCH 2/2] kubelet/rkt: fetch journal logs in json format. This enables more fine-grained control over the things we want to output. Also by closing the stdout/stderr of the journalctl process when user hits `Ctrl-C` after `kubectl logs $POD -f`, this enables the journalctl process to exit. --- pkg/kubelet/rkt/log.go | 171 +++++++++++++++++++++++++++++++++++++++++ pkg/kubelet/rkt/rkt.go | 51 ------------ 2 files changed, 171 insertions(+), 51 deletions(-) create mode 100644 pkg/kubelet/rkt/log.go diff --git a/pkg/kubelet/rkt/log.go b/pkg/kubelet/rkt/log.go new file mode 100644 index 00000000000..d87068fb8af --- /dev/null +++ b/pkg/kubelet/rkt/log.go @@ -0,0 +1,171 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rkt + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "os/exec" + "strconv" + "sync" + "time" + + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" +) + +const ( + // The layout of the time format that satisfies the `--since` option for journalctl. + // See man journalctl for more details. + journalSinceLayout = "2006-01-02 15:04:05" +) + +// pipeLog reads and parses the journal json object from r, +// and writes the logs line by line to w. +func pipeLog(wg *sync.WaitGroup, logOptions *api.PodLogOptions, r io.ReadCloser, w io.Writer) { + defer func() { + r.Close() + wg.Done() + }() + + scanner := bufio.NewScanner(r) + for scanner.Scan() { + var data interface{} + b := scanner.Bytes() + + if err := json.Unmarshal(b, &data); err != nil { + glog.Warningf("rkt: Cannot unmarshal journal log, skipping line: %v", err) + continue + } + + // Decode the json object as a map so we don't have to create a struct for it. + m := data.(map[string]interface{}) + msg := m["MESSAGE"].(string) + tss := m["__REALTIME_TIMESTAMP"].(string) + + ts, err := strconv.ParseInt(tss, 0, 64) + if err != nil { + glog.Warningf("rkt: Cannot parse timestamp of journal log, skipping line: %v", err) + continue + } + + // '_REALTIME_TIMESTAMP' is the microseconds since epoch. + // http://www.freedesktop.org/software/systemd/man/sd_journal_get_realtime_usec.html#Description + micros := time.Duration(ts) * time.Microsecond + t := time.Unix(int64(micros.Seconds()), 0) + + var result string + if logOptions.Timestamps { + // Use the same time format as docker. + result = fmt.Sprintf("%s %s\n", t, msg) + } else { + result = fmt.Sprintf("%s\n", msg) + } + + // When user interrupts the 'kubectl logs $POD -f' with 'Ctrl-C', this write will fail. + // We need to close the reader to force the journalctl process to exit, also we need to + // return here to avoid goroutine leak. + if _, err := io.WriteString(w, result); err != nil { + glog.Warningf("rkt: Cannot write log to output: %v, data: %s", err, result) + return + } + } + + if err := scanner.Err(); err != nil { + glog.Warningf("rkt: Cannot read journal logs", err) + } +} + +// GetContainerLogs uses journalctl to get the logs of the container. +// By default, it returns a snapshot of the container log. Set |follow| to true to +// stream the log. Set |follow| to false and specify the number of lines (e.g. +// "100" or "all") to tail the log. +// +// In rkt runtime's implementation, per container log is get via 'journalctl -m _HOSTNAME=[rkt-$UUID] -u [APP_NAME]'. +// See https://github.com/coreos/rkt/blob/master/Documentation/commands.md#logging for more details. +// +// TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail. +func (r *Runtime) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error { + id, err := parseContainerID(containerID) + if err != nil { + return err + } + + cmd := exec.Command("journalctl", "-m", fmt.Sprintf("_HOSTNAME=rkt-%s", id.uuid), "-u", id.appName, "-a") + + // Get the json structured logs. + cmd.Args = append(cmd.Args, "-o", "json") + + if logOptions.Follow { + cmd.Args = append(cmd.Args, "-f") + } + + if logOptions.TailLines != nil { + cmd.Args = append(cmd.Args, "-n", strconv.FormatInt(*logOptions.TailLines, 10)) + } + + var since int64 + if logOptions.SinceSeconds != nil { + t := unversioned.Now().Add(-time.Duration(*logOptions.SinceSeconds) * time.Second) + since = t.Unix() + } + if logOptions.SinceTime != nil { + since = logOptions.SinceTime.Unix() + } + + if since > 0 { + // Need to add '-r' flag if we include '--since' and '-n' at the both time, + // see https://github.com/systemd/systemd/issues/1477 + cmd.Args = append(cmd.Args, "--since", time.Unix(since, 0).Format(journalSinceLayout)) + if logOptions.TailLines != nil { + cmd.Args = append(cmd.Args, "-r") + } + } + + outPipe, err := cmd.StdoutPipe() + if err != nil { + glog.Errorf("rkt: cannot create pipe for journalctl's stdout", err) + return err + } + errPipe, err := cmd.StderrPipe() + if err != nil { + glog.Errorf("rkt: cannot create pipe for journalctl's stderr", err) + return err + } + + if err := cmd.Start(); err != nil { + return err + } + + var wg sync.WaitGroup + + wg.Add(2) + + go pipeLog(&wg, logOptions, outPipe, stdout) + go pipeLog(&wg, logOptions, errPipe, stderr) + + // Wait until the logs are fed to stdout, stderr. + wg.Wait() + cmd.Wait() + + return nil +} diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 8597dfff192..ec99d1a5686 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -38,7 +38,6 @@ import ( docker "github.com/fsouza/go-dockerclient" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/credentialprovider" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -72,10 +71,6 @@ const ( dockerAuthTemplate = `{"rktKind":"dockerAuth","rktVersion":"v1","registries":[%q],"credentials":{"user":%q,"password":%q}}` defaultImageTag = "latest" - - // The layout of the time format that satisfies the `--since` option for journalctl. - // See man journalctl for more details. - journalSinceLayout = "2006-01-02 15:04:05" ) // Runtime implements the Containerruntime for rkt. The implementation @@ -1051,52 +1046,6 @@ func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus return nil } -// GetContainerLogs uses journalctl to get the logs of the container. -// By default, it returns a snapshot of the container log. Set |follow| to true to -// stream the log. Set |follow| to false and specify the number of lines (e.g. -// "100" or "all") to tail the log. -// -// In rkt runtime's implementation, per container log is get via 'journalctl -m _HOSTNAME=[rkt-$UUID] -u [APP_NAME]'. -// See https://github.com/coreos/rkt/blob/master/Documentation/commands.md#logging for more details. -// -// TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail. -func (r *Runtime) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error { - id, err := parseContainerID(containerID) - if err != nil { - return err - } - - cmd := exec.Command("journalctl", "-m", fmt.Sprintf("_HOSTNAME=rkt-%s", id.uuid), "-u", id.appName, "-a") - - // If no timestamps are required, then only returns the pure logs from the app without - // any metadata. - if !logOptions.Timestamps { - cmd.Args = append(cmd.Args, "-o", "cat") - } - - if logOptions.Follow { - cmd.Args = append(cmd.Args, "-f") - } - - if logOptions.TailLines != nil { - cmd.Args = append(cmd.Args, "-n", strconv.FormatInt(*logOptions.TailLines, 10)) - } - - var since int64 - if logOptions.SinceSeconds != nil { - t := unversioned.Now().Add(-time.Duration(*logOptions.SinceSeconds) * time.Second) - since = t.Unix() - } - if logOptions.SinceTime != nil { - since = logOptions.SinceTime.Unix() - } - - cmd.Args = append(cmd.Args, "--since", time.Unix(since, 0).Format(journalSinceLayout)) - - cmd.Stdout, cmd.Stderr = stdout, stderr - return cmd.Run() -} - // GarbageCollect collects the pods/containers. // TODO(yifan): Enforce the gc policy, also, it would be better if we can // just GC kubernetes pods.