Merge pull request #55922 from Random-Liu/add-partical-cri-log

Automatic merge from submit-queue (batch tested with PRs 55938, 56055, 53385, 55796, 55922). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add partial CRI container log support.

For https://github.com/kubernetes/kubernetes/issues/44976.

New CRI log format:
```
TIMESTAMP STREAM TAG CONTENT
2016-10-06T00:17:09.669794202Z stdout P log content 1
2016-10-06T00:17:09.669794203Z stdout P log content 2
```

Although unlikely, if in the future we need more metadata in each line, we could extend TAG into multiple tags splitted by `:`.

@yujuhong @feiskyer @crassirostris @mrunalp @abhi @mikebrow 
/cc @kubernetes/sig-node-api-reviews @kubernetes/sig-instrumentation-api-reviews 

**Release note**:

```release-note
A new field is added to CRI container log format to support splitting a long log line into multiple lines.
```
This commit is contained in:
Kubernetes Submit Queue 2017-11-21 07:43:53 -08:00 committed by GitHub
commit 164317879b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 113 additions and 73 deletions

View File

@ -114,7 +114,7 @@ data:
time_format %Y-%m-%dT%H:%M:%S.%NZ
</pattern>
<pattern>
format /^(?<time>.+) (?<stream>stdout|stderr) (?<log>.*)$/
format /^(?<time>.+) (?<stream>stdout|stderr) (?<tag>.*) (?<log>.*)$/
time_format %Y-%m-%dT%H:%M:%S.%N%:z
</pattern>
</source>

View File

@ -58,7 +58,7 @@ data:
time_format %Y-%m-%dT%H:%M:%S.%NZ
</pattern>
<pattern>
format /^(?<time>.+) (?<stream>stdout|stderr) (?<log>.*)$/
format /^(?<time>.+) (?<stream>stdout|stderr) (?<tag>.*) (?<log>.*)$/
time_format %Y-%m-%dT%H:%M:%S.%N%:z
</pattern>
</source>

View File

@ -25,3 +25,31 @@ const (
// NetworkReady means the runtime network is up and ready to accept containers which require network.
NetworkReady = "NetworkReady"
)
// LogStreamType is the type of the stream in CRI container log.
type LogStreamType string
const (
// Stdout is the stream type for stdout.
Stdout LogStreamType = "stdout"
// Stderr is the stream type for stderr.
Stderr LogStreamType = "stderr"
)
// LogTag is the tag of a log line in CRI container log.
// Currently defined log tags:
// * First tag: Partial/End - P/E.
// The field in the container log format can be extended to include multiple
// tags by using a delimiter, but changes should be rare. If it becomes clear
// that better extensibility is desired, a more extensible format (e.g., json)
// should be adopted as a replacement and/or addition.
type LogTag string
const (
// LogTagPartial means the line is part of multiple lines.
LogTagPartial LogTag = "P"
// LogTagFull means the line is a single full line or the end of multiple lines.
LogTagFull LogTag = "F"
// LogTagDelimiter is the delimiter for different log tags.
LogTagDelimiter = ":"
)

View File

@ -22,6 +22,7 @@ go_test(
importpath = "k8s.io/kubernetes/pkg/kubelet/kuberuntime/logs",
library = ":go_default_library",
deps = [
"//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -45,13 +45,7 @@ import (
// * If the rotation is using copytruncate, we'll be reading at the original position and get nothing.
// TODO(random-liu): Support log rotation.
// streamType is the type of the stream.
type streamType string
const (
stderrType streamType = "stderr"
stdoutType streamType = "stdout"
// timeFormat is the time format used in the log.
timeFormat = time.RFC3339Nano
// blockSize is the block size used in tail.
@ -66,14 +60,16 @@ const (
var (
// eol is the end-of-line sign in the log.
eol = []byte{'\n'}
// delimiter is the delimiter for timestamp and streamtype in log line.
// delimiter is the delimiter for timestamp and stream type in log line.
delimiter = []byte{' '}
// tagDelimiter is the delimiter for log tags.
tagDelimiter = []byte(runtimeapi.LogTagDelimiter)
)
// logMessage is the CRI internal log type.
type logMessage struct {
timestamp time.Time
stream streamType
stream runtimeapi.LogStreamType
log []byte
}
@ -126,8 +122,8 @@ var parseFuncs = []parseFunc{
}
// parseCRILog parses logs in CRI log format. CRI Log format example:
// 2016-10-06T00:17:09.669794202Z stdout log content 1
// 2016-10-06T00:17:09.669794203Z stderr log content 2
// 2016-10-06T00:17:09.669794202Z stdout P log content 1
// 2016-10-06T00:17:09.669794203Z stderr F log content 2
func parseCRILog(log []byte, msg *logMessage) error {
var err error
// Parse timestamp
@ -146,11 +142,25 @@ func parseCRILog(log []byte, msg *logMessage) error {
if idx < 0 {
return fmt.Errorf("stream type is not found")
}
msg.stream = streamType(log[:idx])
if msg.stream != stdoutType && msg.stream != stderrType {
msg.stream = runtimeapi.LogStreamType(log[:idx])
if msg.stream != runtimeapi.Stdout && msg.stream != runtimeapi.Stderr {
return fmt.Errorf("unexpected stream type %q", msg.stream)
}
// Parse log tag
log = log[idx+1:]
idx = bytes.Index(log, delimiter)
if idx < 0 {
return fmt.Errorf("log tag is not found")
}
// Keep this forward compatible.
tags := bytes.Split(log[:idx], tagDelimiter)
partial := (runtimeapi.LogTag(tags[0]) == runtimeapi.LogTagPartial)
// Trim the tailing new line if this is a partial line.
if partial && len(log) > 0 && log[len(log)-1] == '\n' {
log = log[:len(log)-1]
}
// Get log content
msg.log = log[idx+1:]
@ -170,7 +180,7 @@ func parseDockerJSONLog(log []byte, msg *logMessage) error {
return fmt.Errorf("failed with %v to unmarshal log %q", err, l)
}
msg.timestamp = l.Created
msg.stream = streamType(l.Stream)
msg.stream = runtimeapi.LogStreamType(l.Stream)
msg.log = []byte(l.Log)
return nil
}
@ -230,9 +240,9 @@ func (w *logWriter) write(msg *logMessage) error {
// Get the proper stream to write to.
var stream io.Writer
switch msg.stream {
case stdoutType:
case runtimeapi.Stdout:
stream = w.stdout
case stderrType:
case runtimeapi.Stderr:
stream = w.stderr
default:
return fmt.Errorf("unexpected stream type %q", msg.stream)
@ -277,63 +287,47 @@ func ReadLogs(path, containerID string, opts *LogOptions, runtimeService interna
// Do not create watcher here because it is not needed if `Follow` is false.
var watcher *fsnotify.Watcher
var parse parseFunc
var stop bool
writer := newLogWriter(stdout, stderr, opts)
msg := &logMessage{}
for {
if stop {
glog.V(2).Infof("Finish parsing log file %q", path)
return nil
}
l, err := r.ReadBytes(eol[0])
if err != nil {
if err != io.EOF { // This is an real error
return fmt.Errorf("failed to read log file %q: %v", path, err)
}
if !opts.follow {
// Return directly when reading to the end if not follow.
if len(l) > 0 {
glog.Warningf("Incomplete line in log file %q: %q", path, l)
if parse == nil {
// Intialize the log parsing function.
parse, err = getParseFunc(l)
if err != nil {
return fmt.Errorf("failed to get parse function: %v", err)
}
if opts.follow {
// Reset seek so that if this is an incomplete line,
// it will be read again.
if _, err := f.Seek(-int64(len(l)), os.SEEK_CUR); err != nil {
return fmt.Errorf("failed to reset seek in log file %q: %v", path, err)
}
if watcher == nil {
// Intialize the watcher if it has not been initialized yet.
if watcher, err = fsnotify.NewWatcher(); err != nil {
return fmt.Errorf("failed to create fsnotify watcher: %v", err)
}
// Log a warning and exit if we can't parse the partial line.
if err := parse(l, msg); err != nil {
glog.Warningf("Failed with err %v when parsing partial line for log file %q: %q", err, path, l)
return nil
}
// Write the log line into the stream.
if err := writer.write(msg); err != nil {
if err == errMaximumWrite {
glog.V(2).Infof("Finish parsing log file %q, hit bytes limit %d(bytes)", path, opts.bytes)
return nil
}
glog.Errorf("Failed with err %v when writing partial log for log file %q: %+v", err, path, msg)
return err
defer watcher.Close()
if err := watcher.Add(f.Name()); err != nil {
return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
}
}
glog.V(2).Infof("Finish parsing log file %q", path)
return nil
}
// Reset seek so that if this is an incomplete line,
// it will be read again.
if _, err := f.Seek(-int64(len(l)), os.SEEK_CUR); err != nil {
return fmt.Errorf("failed to reset seek in log file %q: %v", path, err)
}
if watcher == nil {
// Intialize the watcher if it has not been initialized yet.
if watcher, err = fsnotify.NewWatcher(); err != nil {
return fmt.Errorf("failed to create fsnotify watcher: %v", err)
}
defer watcher.Close()
if err := watcher.Add(f.Name()); err != nil {
return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
// Wait until the next log change.
if found, err := waitLogs(containerID, watcher, runtimeService); !found {
return err
}
continue
}
// Wait until the next log change.
if found, err := waitLogs(containerID, watcher, runtimeService); !found {
return err
// Should stop after writing the remaining content.
stop = true
if len(l) == 0 {
continue
}
continue
glog.Warningf("Incomplete line in log file %q: %q", path, l)
}
if parse == nil {
// Intialize the log parsing function.

View File

@ -25,6 +25,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
func TestLogOptions(t *testing.T) {
@ -78,7 +79,7 @@ func TestParseLog(t *testing.T) {
line: `{"log":"docker stdout test log","stream":"stdout","time":"2016-10-20T18:39:20.57606443Z"}` + "\n",
msg: &logMessage{
timestamp: timestamp,
stream: stdoutType,
stream: runtimeapi.Stdout,
log: []byte("docker stdout test log"),
},
},
@ -86,23 +87,23 @@ func TestParseLog(t *testing.T) {
line: `{"log":"docker stderr test log","stream":"stderr","time":"2016-10-20T18:39:20.57606443Z"}` + "\n",
msg: &logMessage{
timestamp: timestamp,
stream: stderrType,
stream: runtimeapi.Stderr,
log: []byte("docker stderr test log"),
},
},
{ // CRI log format stdout
line: "2016-10-20T18:39:20.57606443Z stdout cri stdout test log\n",
line: "2016-10-20T18:39:20.57606443Z stdout F cri stdout test log\n",
msg: &logMessage{
timestamp: timestamp,
stream: stdoutType,
stream: runtimeapi.Stdout,
log: []byte("cri stdout test log\n"),
},
},
{ // CRI log format stderr
line: "2016-10-20T18:39:20.57606443Z stderr cri stderr test log\n",
line: "2016-10-20T18:39:20.57606443Z stderr F cri stderr test log\n",
msg: &logMessage{
timestamp: timestamp,
stream: stderrType,
stream: runtimeapi.Stderr,
log: []byte("cri stderr test log\n"),
},
},
@ -111,6 +112,22 @@ func TestParseLog(t *testing.T) {
msg: &logMessage{},
err: true,
},
{ // Partial CRI log line
line: "2016-10-20T18:39:20.57606443Z stdout P cri stdout partial test log\n",
msg: &logMessage{
timestamp: timestamp,
stream: runtimeapi.Stdout,
log: []byte("cri stdout partial test log"),
},
},
{ // Partial CRI log line with multiple log tags.
line: "2016-10-20T18:39:20.57606443Z stdout P:TAG1:TAG2 cri stdout partial test log\n",
msg: &logMessage{
timestamp: timestamp,
stream: runtimeapi.Stdout,
log: []byte("cri stdout partial test log"),
},
},
} {
t.Logf("TestCase #%d: %+v", c, test)
parse, err := getParseFunc([]byte(test.line))
@ -130,26 +147,26 @@ func TestWriteLogs(t *testing.T) {
log := "abcdefg\n"
for c, test := range []struct {
stream streamType
stream runtimeapi.LogStreamType
since time.Time
timestamp bool
expectStdout string
expectStderr string
}{
{ // stderr log
stream: stderrType,
stream: runtimeapi.Stderr,
expectStderr: log,
},
{ // stdout log
stream: stdoutType,
stream: runtimeapi.Stdout,
expectStdout: log,
},
{ // since is after timestamp
stream: stdoutType,
stream: runtimeapi.Stdout,
since: timestamp.Add(1 * time.Second),
},
{ // timestamp enabled
stream: stderrType,
stream: runtimeapi.Stderr,
timestamp: true,
expectStderr: timestamp.Format(timeFormat) + " " + log,
},
@ -226,13 +243,13 @@ func TestWriteLogsWithBytesLimit(t *testing.T) {
stderrBuf := bytes.NewBuffer(nil)
w := newLogWriter(stdoutBuf, stderrBuf, &LogOptions{timestamp: test.timestamp, bytes: int64(test.bytes)})
for i := 0; i < test.stdoutLines; i++ {
msg.stream = stdoutType
msg.stream = runtimeapi.Stdout
if err := w.write(msg); err != nil {
assert.EqualError(t, err, errMaximumWrite.Error())
}
}
for i := 0; i < test.stderrLines; i++ {
msg.stream = stderrType
msg.stream = runtimeapi.Stderr
if err := w.write(msg); err != nil {
assert.EqualError(t, err, errMaximumWrite.Error())
}