From de73e4596afb021b7fa731b07e8fa06ecbdf7701 Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Fri, 17 Nov 2017 04:39:36 +0000 Subject: [PATCH 1/2] Add constants in CRI. --- .../apis/cri/v1alpha1/runtime/constants.go | 28 +++++ pkg/kubelet/kuberuntime/logs/BUILD | 1 + pkg/kubelet/kuberuntime/logs/logs.go | 110 +++++++++--------- pkg/kubelet/kuberuntime/logs/logs_test.go | 43 ++++--- 4 files changed, 111 insertions(+), 71 deletions(-) diff --git a/pkg/kubelet/apis/cri/v1alpha1/runtime/constants.go b/pkg/kubelet/apis/cri/v1alpha1/runtime/constants.go index 27c42c5c516..04cac264d14 100644 --- a/pkg/kubelet/apis/cri/v1alpha1/runtime/constants.go +++ b/pkg/kubelet/apis/cri/v1alpha1/runtime/constants.go @@ -25,3 +25,31 @@ const ( // NetworkReady means the runtime network is up and ready to accept containers which require network. NetworkReady = "NetworkReady" ) + +// LogStreamType is the type of the stream in CRI container log. +type LogStreamType string + +const ( + // Stdout is the stream type for stdout. + Stdout LogStreamType = "stdout" + // Stderr is the stream type for stderr. + Stderr LogStreamType = "stderr" +) + +// LogTag is the tag of a log line in CRI container log. +// Currently defined log tags: +// * First tag: Partial/End - P/E. +// The field in the container log format can be extended to include multiple +// tags by using a delimiter, but changes should be rare. If it becomes clear +// that better extensibility is desired, a more extensible format (e.g., json) +// should be adopted as a replacement and/or addition. +type LogTag string + +const ( + // LogTagPartial means the line is part of multiple lines. + LogTagPartial LogTag = "P" + // LogTagFull means the line is a single full line or the end of multiple lines. + LogTagFull LogTag = "F" + // LogTagDelimiter is the delimiter for different log tags. + LogTagDelimiter = ":" +) diff --git a/pkg/kubelet/kuberuntime/logs/BUILD b/pkg/kubelet/kuberuntime/logs/BUILD index a7f94599590..211a0b2c0fe 100644 --- a/pkg/kubelet/kuberuntime/logs/BUILD +++ b/pkg/kubelet/kuberuntime/logs/BUILD @@ -22,6 +22,7 @@ go_test( importpath = "k8s.io/kubernetes/pkg/kubelet/kuberuntime/logs", library = ":go_default_library", deps = [ + "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/kubelet/kuberuntime/logs/logs.go b/pkg/kubelet/kuberuntime/logs/logs.go index ebfeb69860a..e029d40d6c0 100644 --- a/pkg/kubelet/kuberuntime/logs/logs.go +++ b/pkg/kubelet/kuberuntime/logs/logs.go @@ -45,13 +45,7 @@ import ( // * If the rotation is using copytruncate, we'll be reading at the original position and get nothing. // TODO(random-liu): Support log rotation. -// streamType is the type of the stream. -type streamType string - const ( - stderrType streamType = "stderr" - stdoutType streamType = "stdout" - // timeFormat is the time format used in the log. timeFormat = time.RFC3339Nano // blockSize is the block size used in tail. @@ -66,14 +60,16 @@ const ( var ( // eol is the end-of-line sign in the log. eol = []byte{'\n'} - // delimiter is the delimiter for timestamp and streamtype in log line. + // delimiter is the delimiter for timestamp and stream type in log line. delimiter = []byte{' '} + // tagDelimiter is the delimiter for log tags. + tagDelimiter = []byte(runtimeapi.LogTagDelimiter) ) // logMessage is the CRI internal log type. type logMessage struct { timestamp time.Time - stream streamType + stream runtimeapi.LogStreamType log []byte } @@ -126,8 +122,8 @@ var parseFuncs = []parseFunc{ } // parseCRILog parses logs in CRI log format. CRI Log format example: -// 2016-10-06T00:17:09.669794202Z stdout log content 1 -// 2016-10-06T00:17:09.669794203Z stderr log content 2 +// 2016-10-06T00:17:09.669794202Z stdout P log content 1 +// 2016-10-06T00:17:09.669794203Z stderr F log content 2 func parseCRILog(log []byte, msg *logMessage) error { var err error // Parse timestamp @@ -146,11 +142,25 @@ func parseCRILog(log []byte, msg *logMessage) error { if idx < 0 { return fmt.Errorf("stream type is not found") } - msg.stream = streamType(log[:idx]) - if msg.stream != stdoutType && msg.stream != stderrType { + msg.stream = runtimeapi.LogStreamType(log[:idx]) + if msg.stream != runtimeapi.Stdout && msg.stream != runtimeapi.Stderr { return fmt.Errorf("unexpected stream type %q", msg.stream) } + // Parse log tag + log = log[idx+1:] + idx = bytes.Index(log, delimiter) + if idx < 0 { + return fmt.Errorf("log tag is not found") + } + // Keep this forward compatible. + tags := bytes.Split(log[:idx], tagDelimiter) + partial := (runtimeapi.LogTag(tags[0]) == runtimeapi.LogTagPartial) + // Trim the tailing new line if this is a partial line. + if partial && len(log) > 0 && log[len(log)-1] == '\n' { + log = log[:len(log)-1] + } + // Get log content msg.log = log[idx+1:] @@ -170,7 +180,7 @@ func parseDockerJSONLog(log []byte, msg *logMessage) error { return fmt.Errorf("failed with %v to unmarshal log %q", err, l) } msg.timestamp = l.Created - msg.stream = streamType(l.Stream) + msg.stream = runtimeapi.LogStreamType(l.Stream) msg.log = []byte(l.Log) return nil } @@ -230,9 +240,9 @@ func (w *logWriter) write(msg *logMessage) error { // Get the proper stream to write to. var stream io.Writer switch msg.stream { - case stdoutType: + case runtimeapi.Stdout: stream = w.stdout - case stderrType: + case runtimeapi.Stderr: stream = w.stderr default: return fmt.Errorf("unexpected stream type %q", msg.stream) @@ -277,63 +287,47 @@ func ReadLogs(path, containerID string, opts *LogOptions, runtimeService interna // Do not create watcher here because it is not needed if `Follow` is false. var watcher *fsnotify.Watcher var parse parseFunc + var stop bool writer := newLogWriter(stdout, stderr, opts) msg := &logMessage{} for { + if stop { + glog.V(2).Infof("Finish parsing log file %q", path) + return nil + } l, err := r.ReadBytes(eol[0]) if err != nil { if err != io.EOF { // This is an real error return fmt.Errorf("failed to read log file %q: %v", path, err) } - if !opts.follow { - // Return directly when reading to the end if not follow. - if len(l) > 0 { - glog.Warningf("Incomplete line in log file %q: %q", path, l) - if parse == nil { - // Intialize the log parsing function. - parse, err = getParseFunc(l) - if err != nil { - return fmt.Errorf("failed to get parse function: %v", err) - } + if opts.follow { + // Reset seek so that if this is an incomplete line, + // it will be read again. + if _, err := f.Seek(-int64(len(l)), os.SEEK_CUR); err != nil { + return fmt.Errorf("failed to reset seek in log file %q: %v", path, err) + } + if watcher == nil { + // Intialize the watcher if it has not been initialized yet. + if watcher, err = fsnotify.NewWatcher(); err != nil { + return fmt.Errorf("failed to create fsnotify watcher: %v", err) } - // Log a warning and exit if we can't parse the partial line. - if err := parse(l, msg); err != nil { - glog.Warningf("Failed with err %v when parsing partial line for log file %q: %q", err, path, l) - return nil - } - // Write the log line into the stream. - if err := writer.write(msg); err != nil { - if err == errMaximumWrite { - glog.V(2).Infof("Finish parsing log file %q, hit bytes limit %d(bytes)", path, opts.bytes) - return nil - } - glog.Errorf("Failed with err %v when writing partial log for log file %q: %+v", err, path, msg) - return err + defer watcher.Close() + if err := watcher.Add(f.Name()); err != nil { + return fmt.Errorf("failed to watch file %q: %v", f.Name(), err) } } - glog.V(2).Infof("Finish parsing log file %q", path) - return nil - } - // Reset seek so that if this is an incomplete line, - // it will be read again. - if _, err := f.Seek(-int64(len(l)), os.SEEK_CUR); err != nil { - return fmt.Errorf("failed to reset seek in log file %q: %v", path, err) - } - if watcher == nil { - // Intialize the watcher if it has not been initialized yet. - if watcher, err = fsnotify.NewWatcher(); err != nil { - return fmt.Errorf("failed to create fsnotify watcher: %v", err) - } - defer watcher.Close() - if err := watcher.Add(f.Name()); err != nil { - return fmt.Errorf("failed to watch file %q: %v", f.Name(), err) + // Wait until the next log change. + if found, err := waitLogs(containerID, watcher, runtimeService); !found { + return err } + continue } - // Wait until the next log change. - if found, err := waitLogs(containerID, watcher, runtimeService); !found { - return err + // Should stop after writing the remaining content. + stop = true + if len(l) == 0 { + continue } - continue + glog.Warningf("Incomplete line in log file %q: %q", path, l) } if parse == nil { // Intialize the log parsing function. diff --git a/pkg/kubelet/kuberuntime/logs/logs_test.go b/pkg/kubelet/kuberuntime/logs/logs_test.go index 5c4eaae9b30..17b074a01fd 100644 --- a/pkg/kubelet/kuberuntime/logs/logs_test.go +++ b/pkg/kubelet/kuberuntime/logs/logs_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) func TestLogOptions(t *testing.T) { @@ -78,7 +79,7 @@ func TestParseLog(t *testing.T) { line: `{"log":"docker stdout test log","stream":"stdout","time":"2016-10-20T18:39:20.57606443Z"}` + "\n", msg: &logMessage{ timestamp: timestamp, - stream: stdoutType, + stream: runtimeapi.Stdout, log: []byte("docker stdout test log"), }, }, @@ -86,23 +87,23 @@ func TestParseLog(t *testing.T) { line: `{"log":"docker stderr test log","stream":"stderr","time":"2016-10-20T18:39:20.57606443Z"}` + "\n", msg: &logMessage{ timestamp: timestamp, - stream: stderrType, + stream: runtimeapi.Stderr, log: []byte("docker stderr test log"), }, }, { // CRI log format stdout - line: "2016-10-20T18:39:20.57606443Z stdout cri stdout test log\n", + line: "2016-10-20T18:39:20.57606443Z stdout F cri stdout test log\n", msg: &logMessage{ timestamp: timestamp, - stream: stdoutType, + stream: runtimeapi.Stdout, log: []byte("cri stdout test log\n"), }, }, { // CRI log format stderr - line: "2016-10-20T18:39:20.57606443Z stderr cri stderr test log\n", + line: "2016-10-20T18:39:20.57606443Z stderr F cri stderr test log\n", msg: &logMessage{ timestamp: timestamp, - stream: stderrType, + stream: runtimeapi.Stderr, log: []byte("cri stderr test log\n"), }, }, @@ -111,6 +112,22 @@ func TestParseLog(t *testing.T) { msg: &logMessage{}, err: true, }, + { // Partial CRI log line + line: "2016-10-20T18:39:20.57606443Z stdout P cri stdout partial test log\n", + msg: &logMessage{ + timestamp: timestamp, + stream: runtimeapi.Stdout, + log: []byte("cri stdout partial test log"), + }, + }, + { // Partial CRI log line with multiple log tags. + line: "2016-10-20T18:39:20.57606443Z stdout P:TAG1:TAG2 cri stdout partial test log\n", + msg: &logMessage{ + timestamp: timestamp, + stream: runtimeapi.Stdout, + log: []byte("cri stdout partial test log"), + }, + }, } { t.Logf("TestCase #%d: %+v", c, test) parse, err := getParseFunc([]byte(test.line)) @@ -130,26 +147,26 @@ func TestWriteLogs(t *testing.T) { log := "abcdefg\n" for c, test := range []struct { - stream streamType + stream runtimeapi.LogStreamType since time.Time timestamp bool expectStdout string expectStderr string }{ { // stderr log - stream: stderrType, + stream: runtimeapi.Stderr, expectStderr: log, }, { // stdout log - stream: stdoutType, + stream: runtimeapi.Stdout, expectStdout: log, }, { // since is after timestamp - stream: stdoutType, + stream: runtimeapi.Stdout, since: timestamp.Add(1 * time.Second), }, { // timestamp enabled - stream: stderrType, + stream: runtimeapi.Stderr, timestamp: true, expectStderr: timestamp.Format(timeFormat) + " " + log, }, @@ -226,13 +243,13 @@ func TestWriteLogsWithBytesLimit(t *testing.T) { stderrBuf := bytes.NewBuffer(nil) w := newLogWriter(stdoutBuf, stderrBuf, &LogOptions{timestamp: test.timestamp, bytes: int64(test.bytes)}) for i := 0; i < test.stdoutLines; i++ { - msg.stream = stdoutType + msg.stream = runtimeapi.Stdout if err := w.write(msg); err != nil { assert.EqualError(t, err, errMaximumWrite.Error()) } } for i := 0; i < test.stderrLines; i++ { - msg.stream = stderrType + msg.stream = runtimeapi.Stderr if err := w.write(msg); err != nil { assert.EqualError(t, err, errMaximumWrite.Error()) } From 2bc0532eb31dc15bcc0fe21a9bb359520908f4b0 Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Fri, 17 Nov 2017 05:42:59 +0000 Subject: [PATCH 2/2] Add fluentd support. --- cluster/addons/fluentd-elasticsearch/fluentd-es-configmap.yaml | 2 +- cluster/addons/fluentd-gcp/fluentd-gcp-configmap.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cluster/addons/fluentd-elasticsearch/fluentd-es-configmap.yaml b/cluster/addons/fluentd-elasticsearch/fluentd-es-configmap.yaml index 161e4a62ebf..5c3bb09f632 100644 --- a/cluster/addons/fluentd-elasticsearch/fluentd-es-configmap.yaml +++ b/cluster/addons/fluentd-elasticsearch/fluentd-es-configmap.yaml @@ -114,7 +114,7 @@ data: time_format %Y-%m-%dT%H:%M:%S.%NZ - format /^(? diff --git a/cluster/addons/fluentd-gcp/fluentd-gcp-configmap.yaml b/cluster/addons/fluentd-gcp/fluentd-gcp-configmap.yaml index bbd02f8f5ec..fa002d70bc3 100644 --- a/cluster/addons/fluentd-gcp/fluentd-gcp-configmap.yaml +++ b/cluster/addons/fluentd-gcp/fluentd-gcp-configmap.yaml @@ -58,7 +58,7 @@ data: time_format %Y-%m-%dT%H:%M:%S.%NZ - format /^(?