diff --git a/cluster/addons/fluentd-elasticsearch/fluentd-es-configmap.yaml b/cluster/addons/fluentd-elasticsearch/fluentd-es-configmap.yaml
index 161e4a62ebf..5c3bb09f632 100644
--- a/cluster/addons/fluentd-elasticsearch/fluentd-es-configmap.yaml
+++ b/cluster/addons/fluentd-elasticsearch/fluentd-es-configmap.yaml
@@ -114,7 +114,7 @@ data:
time_format %Y-%m-%dT%H:%M:%S.%NZ
- format /^(?
diff --git a/cluster/addons/fluentd-gcp/fluentd-gcp-configmap.yaml b/cluster/addons/fluentd-gcp/fluentd-gcp-configmap.yaml
index bbd02f8f5ec..fa002d70bc3 100644
--- a/cluster/addons/fluentd-gcp/fluentd-gcp-configmap.yaml
+++ b/cluster/addons/fluentd-gcp/fluentd-gcp-configmap.yaml
@@ -58,7 +58,7 @@ data:
time_format %Y-%m-%dT%H:%M:%S.%NZ
- format /^(?.+) (?stdout|stderr) (?.*)$/
+ format /^(?.+) (?stdout|stderr) (?.*) (?.*)$/
time_format %Y-%m-%dT%H:%M:%S.%N%:z
diff --git a/pkg/kubelet/apis/cri/v1alpha1/runtime/constants.go b/pkg/kubelet/apis/cri/v1alpha1/runtime/constants.go
index 27c42c5c516..04cac264d14 100644
--- a/pkg/kubelet/apis/cri/v1alpha1/runtime/constants.go
+++ b/pkg/kubelet/apis/cri/v1alpha1/runtime/constants.go
@@ -25,3 +25,31 @@ const (
// NetworkReady means the runtime network is up and ready to accept containers which require network.
NetworkReady = "NetworkReady"
)
+
+// LogStreamType is the type of the stream in CRI container log.
+type LogStreamType string
+
+const (
+ // Stdout is the stream type for stdout.
+ Stdout LogStreamType = "stdout"
+ // Stderr is the stream type for stderr.
+ Stderr LogStreamType = "stderr"
+)
+
+// LogTag is the tag of a log line in CRI container log.
+// Currently defined log tags:
+// * First tag: Partial/End - P/E.
+// The field in the container log format can be extended to include multiple
+// tags by using a delimiter, but changes should be rare. If it becomes clear
+// that better extensibility is desired, a more extensible format (e.g., json)
+// should be adopted as a replacement and/or addition.
+type LogTag string
+
+const (
+ // LogTagPartial means the line is part of multiple lines.
+ LogTagPartial LogTag = "P"
+ // LogTagFull means the line is a single full line or the end of multiple lines.
+ LogTagFull LogTag = "F"
+ // LogTagDelimiter is the delimiter for different log tags.
+ LogTagDelimiter = ":"
+)
diff --git a/pkg/kubelet/kuberuntime/logs/BUILD b/pkg/kubelet/kuberuntime/logs/BUILD
index a7f94599590..211a0b2c0fe 100644
--- a/pkg/kubelet/kuberuntime/logs/BUILD
+++ b/pkg/kubelet/kuberuntime/logs/BUILD
@@ -22,6 +22,7 @@ go_test(
importpath = "k8s.io/kubernetes/pkg/kubelet/kuberuntime/logs",
library = ":go_default_library",
deps = [
+ "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
diff --git a/pkg/kubelet/kuberuntime/logs/logs.go b/pkg/kubelet/kuberuntime/logs/logs.go
index ebfeb69860a..e029d40d6c0 100644
--- a/pkg/kubelet/kuberuntime/logs/logs.go
+++ b/pkg/kubelet/kuberuntime/logs/logs.go
@@ -45,13 +45,7 @@ import (
// * If the rotation is using copytruncate, we'll be reading at the original position and get nothing.
// TODO(random-liu): Support log rotation.
-// streamType is the type of the stream.
-type streamType string
-
const (
- stderrType streamType = "stderr"
- stdoutType streamType = "stdout"
-
// timeFormat is the time format used in the log.
timeFormat = time.RFC3339Nano
// blockSize is the block size used in tail.
@@ -66,14 +60,16 @@ const (
var (
// eol is the end-of-line sign in the log.
eol = []byte{'\n'}
- // delimiter is the delimiter for timestamp and streamtype in log line.
+ // delimiter is the delimiter for timestamp and stream type in log line.
delimiter = []byte{' '}
+ // tagDelimiter is the delimiter for log tags.
+ tagDelimiter = []byte(runtimeapi.LogTagDelimiter)
)
// logMessage is the CRI internal log type.
type logMessage struct {
timestamp time.Time
- stream streamType
+ stream runtimeapi.LogStreamType
log []byte
}
@@ -126,8 +122,8 @@ var parseFuncs = []parseFunc{
}
// parseCRILog parses logs in CRI log format. CRI Log format example:
-// 2016-10-06T00:17:09.669794202Z stdout log content 1
-// 2016-10-06T00:17:09.669794203Z stderr log content 2
+// 2016-10-06T00:17:09.669794202Z stdout P log content 1
+// 2016-10-06T00:17:09.669794203Z stderr F log content 2
func parseCRILog(log []byte, msg *logMessage) error {
var err error
// Parse timestamp
@@ -146,11 +142,25 @@ func parseCRILog(log []byte, msg *logMessage) error {
if idx < 0 {
return fmt.Errorf("stream type is not found")
}
- msg.stream = streamType(log[:idx])
- if msg.stream != stdoutType && msg.stream != stderrType {
+ msg.stream = runtimeapi.LogStreamType(log[:idx])
+ if msg.stream != runtimeapi.Stdout && msg.stream != runtimeapi.Stderr {
return fmt.Errorf("unexpected stream type %q", msg.stream)
}
+ // Parse log tag
+ log = log[idx+1:]
+ idx = bytes.Index(log, delimiter)
+ if idx < 0 {
+ return fmt.Errorf("log tag is not found")
+ }
+ // Keep this forward compatible.
+ tags := bytes.Split(log[:idx], tagDelimiter)
+ partial := (runtimeapi.LogTag(tags[0]) == runtimeapi.LogTagPartial)
+ // Trim the tailing new line if this is a partial line.
+ if partial && len(log) > 0 && log[len(log)-1] == '\n' {
+ log = log[:len(log)-1]
+ }
+
// Get log content
msg.log = log[idx+1:]
@@ -170,7 +180,7 @@ func parseDockerJSONLog(log []byte, msg *logMessage) error {
return fmt.Errorf("failed with %v to unmarshal log %q", err, l)
}
msg.timestamp = l.Created
- msg.stream = streamType(l.Stream)
+ msg.stream = runtimeapi.LogStreamType(l.Stream)
msg.log = []byte(l.Log)
return nil
}
@@ -230,9 +240,9 @@ func (w *logWriter) write(msg *logMessage) error {
// Get the proper stream to write to.
var stream io.Writer
switch msg.stream {
- case stdoutType:
+ case runtimeapi.Stdout:
stream = w.stdout
- case stderrType:
+ case runtimeapi.Stderr:
stream = w.stderr
default:
return fmt.Errorf("unexpected stream type %q", msg.stream)
@@ -277,63 +287,47 @@ func ReadLogs(path, containerID string, opts *LogOptions, runtimeService interna
// Do not create watcher here because it is not needed if `Follow` is false.
var watcher *fsnotify.Watcher
var parse parseFunc
+ var stop bool
writer := newLogWriter(stdout, stderr, opts)
msg := &logMessage{}
for {
+ if stop {
+ glog.V(2).Infof("Finish parsing log file %q", path)
+ return nil
+ }
l, err := r.ReadBytes(eol[0])
if err != nil {
if err != io.EOF { // This is an real error
return fmt.Errorf("failed to read log file %q: %v", path, err)
}
- if !opts.follow {
- // Return directly when reading to the end if not follow.
- if len(l) > 0 {
- glog.Warningf("Incomplete line in log file %q: %q", path, l)
- if parse == nil {
- // Intialize the log parsing function.
- parse, err = getParseFunc(l)
- if err != nil {
- return fmt.Errorf("failed to get parse function: %v", err)
- }
+ if opts.follow {
+ // Reset seek so that if this is an incomplete line,
+ // it will be read again.
+ if _, err := f.Seek(-int64(len(l)), os.SEEK_CUR); err != nil {
+ return fmt.Errorf("failed to reset seek in log file %q: %v", path, err)
+ }
+ if watcher == nil {
+ // Intialize the watcher if it has not been initialized yet.
+ if watcher, err = fsnotify.NewWatcher(); err != nil {
+ return fmt.Errorf("failed to create fsnotify watcher: %v", err)
}
- // Log a warning and exit if we can't parse the partial line.
- if err := parse(l, msg); err != nil {
- glog.Warningf("Failed with err %v when parsing partial line for log file %q: %q", err, path, l)
- return nil
- }
- // Write the log line into the stream.
- if err := writer.write(msg); err != nil {
- if err == errMaximumWrite {
- glog.V(2).Infof("Finish parsing log file %q, hit bytes limit %d(bytes)", path, opts.bytes)
- return nil
- }
- glog.Errorf("Failed with err %v when writing partial log for log file %q: %+v", err, path, msg)
- return err
+ defer watcher.Close()
+ if err := watcher.Add(f.Name()); err != nil {
+ return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
}
}
- glog.V(2).Infof("Finish parsing log file %q", path)
- return nil
- }
- // Reset seek so that if this is an incomplete line,
- // it will be read again.
- if _, err := f.Seek(-int64(len(l)), os.SEEK_CUR); err != nil {
- return fmt.Errorf("failed to reset seek in log file %q: %v", path, err)
- }
- if watcher == nil {
- // Intialize the watcher if it has not been initialized yet.
- if watcher, err = fsnotify.NewWatcher(); err != nil {
- return fmt.Errorf("failed to create fsnotify watcher: %v", err)
- }
- defer watcher.Close()
- if err := watcher.Add(f.Name()); err != nil {
- return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
+ // Wait until the next log change.
+ if found, err := waitLogs(containerID, watcher, runtimeService); !found {
+ return err
}
+ continue
}
- // Wait until the next log change.
- if found, err := waitLogs(containerID, watcher, runtimeService); !found {
- return err
+ // Should stop after writing the remaining content.
+ stop = true
+ if len(l) == 0 {
+ continue
}
- continue
+ glog.Warningf("Incomplete line in log file %q: %q", path, l)
}
if parse == nil {
// Intialize the log parsing function.
diff --git a/pkg/kubelet/kuberuntime/logs/logs_test.go b/pkg/kubelet/kuberuntime/logs/logs_test.go
index 5c4eaae9b30..17b074a01fd 100644
--- a/pkg/kubelet/kuberuntime/logs/logs_test.go
+++ b/pkg/kubelet/kuberuntime/logs/logs_test.go
@@ -25,6 +25,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
func TestLogOptions(t *testing.T) {
@@ -78,7 +79,7 @@ func TestParseLog(t *testing.T) {
line: `{"log":"docker stdout test log","stream":"stdout","time":"2016-10-20T18:39:20.57606443Z"}` + "\n",
msg: &logMessage{
timestamp: timestamp,
- stream: stdoutType,
+ stream: runtimeapi.Stdout,
log: []byte("docker stdout test log"),
},
},
@@ -86,23 +87,23 @@ func TestParseLog(t *testing.T) {
line: `{"log":"docker stderr test log","stream":"stderr","time":"2016-10-20T18:39:20.57606443Z"}` + "\n",
msg: &logMessage{
timestamp: timestamp,
- stream: stderrType,
+ stream: runtimeapi.Stderr,
log: []byte("docker stderr test log"),
},
},
{ // CRI log format stdout
- line: "2016-10-20T18:39:20.57606443Z stdout cri stdout test log\n",
+ line: "2016-10-20T18:39:20.57606443Z stdout F cri stdout test log\n",
msg: &logMessage{
timestamp: timestamp,
- stream: stdoutType,
+ stream: runtimeapi.Stdout,
log: []byte("cri stdout test log\n"),
},
},
{ // CRI log format stderr
- line: "2016-10-20T18:39:20.57606443Z stderr cri stderr test log\n",
+ line: "2016-10-20T18:39:20.57606443Z stderr F cri stderr test log\n",
msg: &logMessage{
timestamp: timestamp,
- stream: stderrType,
+ stream: runtimeapi.Stderr,
log: []byte("cri stderr test log\n"),
},
},
@@ -111,6 +112,22 @@ func TestParseLog(t *testing.T) {
msg: &logMessage{},
err: true,
},
+ { // Partial CRI log line
+ line: "2016-10-20T18:39:20.57606443Z stdout P cri stdout partial test log\n",
+ msg: &logMessage{
+ timestamp: timestamp,
+ stream: runtimeapi.Stdout,
+ log: []byte("cri stdout partial test log"),
+ },
+ },
+ { // Partial CRI log line with multiple log tags.
+ line: "2016-10-20T18:39:20.57606443Z stdout P:TAG1:TAG2 cri stdout partial test log\n",
+ msg: &logMessage{
+ timestamp: timestamp,
+ stream: runtimeapi.Stdout,
+ log: []byte("cri stdout partial test log"),
+ },
+ },
} {
t.Logf("TestCase #%d: %+v", c, test)
parse, err := getParseFunc([]byte(test.line))
@@ -130,26 +147,26 @@ func TestWriteLogs(t *testing.T) {
log := "abcdefg\n"
for c, test := range []struct {
- stream streamType
+ stream runtimeapi.LogStreamType
since time.Time
timestamp bool
expectStdout string
expectStderr string
}{
{ // stderr log
- stream: stderrType,
+ stream: runtimeapi.Stderr,
expectStderr: log,
},
{ // stdout log
- stream: stdoutType,
+ stream: runtimeapi.Stdout,
expectStdout: log,
},
{ // since is after timestamp
- stream: stdoutType,
+ stream: runtimeapi.Stdout,
since: timestamp.Add(1 * time.Second),
},
{ // timestamp enabled
- stream: stderrType,
+ stream: runtimeapi.Stderr,
timestamp: true,
expectStderr: timestamp.Format(timeFormat) + " " + log,
},
@@ -226,13 +243,13 @@ func TestWriteLogsWithBytesLimit(t *testing.T) {
stderrBuf := bytes.NewBuffer(nil)
w := newLogWriter(stdoutBuf, stderrBuf, &LogOptions{timestamp: test.timestamp, bytes: int64(test.bytes)})
for i := 0; i < test.stdoutLines; i++ {
- msg.stream = stdoutType
+ msg.stream = runtimeapi.Stdout
if err := w.write(msg); err != nil {
assert.EqualError(t, err, errMaximumWrite.Error())
}
}
for i := 0; i < test.stderrLines; i++ {
- msg.stream = stderrType
+ msg.stream = runtimeapi.Stderr
if err := w.write(msg); err != nil {
assert.EqualError(t, err, errMaximumWrite.Error())
}