From 40b38f09d9b3f2cb3de7037758c3a40c7b108c37 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 26 Nov 2021 10:46:29 +0100 Subject: [PATCH 1/7] json: more flexible constructor The encoder configuration can now be chosen by the caller. This will be used by a benchmark to write messages without caller and time stamp. While at it, some places where the logger was unnecessarily tested with split output streams writing into the same actual stream were replaced with writing as single stream. This is a leftover from a previous incarnation of the split output stream patch where identical streams were used instead of nil for the error stream to indicate "single stream". --- .../k8s.io/component-base/logs/json/json.go | 29 ++++++++++--------- .../logs/json/json_benchmark_test.go | 6 ++-- .../component-base/logs/json/json_test.go | 10 +++---- .../component-base/logs/json/klog_test.go | 4 +-- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/staging/src/k8s.io/component-base/logs/json/json.go b/staging/src/k8s.io/component-base/logs/json/json.go index 32e66007a48..1adada10cd4 100644 --- a/staging/src/k8s.io/component-base/logs/json/json.go +++ b/staging/src/k8s.io/component-base/logs/json/json.go @@ -36,8 +36,20 @@ var ( // NewJSONLogger creates a new json logr.Logger and its associated // flush function. The separate error stream is optional and may be nil. -func NewJSONLogger(infoStream, errorStream zapcore.WriteSyncer) (logr.Logger, func()) { - encoder := zapcore.NewJSONEncoder(encoderConfig) +// The encoder config is also optional. +func NewJSONLogger(infoStream, errorStream zapcore.WriteSyncer, encoderConfig *zapcore.EncoderConfig) (logr.Logger, func()) { + if encoderConfig == nil { + encoderConfig = &zapcore.EncoderConfig{ + MessageKey: "msg", + CallerKey: "caller", + TimeKey: "ts", + EncodeTime: epochMillisTimeEncoder, + EncodeDuration: zapcore.StringDurationEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + } + } + + encoder := zapcore.NewJSONEncoder(*encoderConfig) var core zapcore.Core if errorStream == nil { core = zapcore.NewCore(encoder, infoStream, zapcore.Level(-127)) @@ -59,15 +71,6 @@ func NewJSONLogger(infoStream, errorStream zapcore.WriteSyncer) (logr.Logger, fu } } -var encoderConfig = zapcore.EncoderConfig{ - MessageKey: "msg", - CallerKey: "caller", - TimeKey: "ts", - EncodeTime: epochMillisTimeEncoder, - EncodeDuration: zapcore.StringDurationEncoder, - EncodeCaller: zapcore.ShortCallerEncoder, -} - func epochMillisTimeEncoder(_ time.Time, enc zapcore.PrimitiveArrayEncoder) { nanos := timeNow().UnixNano() millis := float64(nanos) / float64(time.Millisecond) @@ -95,8 +98,8 @@ func (f Factory) Create(options config.FormatOptions) (logr.Logger, func()) { } } // stdout for info messages, stderr for errors. - return NewJSONLogger(stdout, stderr) + return NewJSONLogger(stdout, stderr, nil) } // Write info messages and errors to stderr to prevent mixing with normal program output. - return NewJSONLogger(stderr, nil) + return NewJSONLogger(stderr, nil, nil) } diff --git a/staging/src/k8s.io/component-base/logs/json/json_benchmark_test.go b/staging/src/k8s.io/component-base/logs/json/json_benchmark_test.go index 7d894896bcd..ca4d6949090 100644 --- a/staging/src/k8s.io/component-base/logs/json/json_benchmark_test.go +++ b/staging/src/k8s.io/component-base/logs/json/json_benchmark_test.go @@ -26,7 +26,7 @@ import ( var writer = zapcore.AddSync(&writeSyncer{}) func BenchmarkInfoLoggerInfo(b *testing.B) { - logger, _ := NewJSONLogger(writer, writer) + logger, _ := NewJSONLogger(writer, nil, nil) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { @@ -55,7 +55,7 @@ func BenchmarkInfoLoggerInfo(b *testing.B) { } func BenchmarkZapLoggerError(b *testing.B) { - logger, _ := NewJSONLogger(writer, writer) + logger, _ := NewJSONLogger(writer, nil, nil) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { @@ -85,7 +85,7 @@ func BenchmarkZapLoggerError(b *testing.B) { } func BenchmarkZapLoggerV(b *testing.B) { - logger, _ := NewJSONLogger(writer, writer) + logger, _ := NewJSONLogger(writer, nil, nil) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { diff --git a/staging/src/k8s.io/component-base/logs/json/json_test.go b/staging/src/k8s.io/component-base/logs/json/json_test.go index 75edaad0986..a360d10c934 100644 --- a/staging/src/k8s.io/component-base/logs/json/json_test.go +++ b/staging/src/k8s.io/component-base/logs/json/json_test.go @@ -64,7 +64,7 @@ func TestZapLoggerInfo(t *testing.T) { for _, data := range testDataInfo { var buffer bytes.Buffer writer := zapcore.AddSync(&buffer) - sampleInfoLogger, _ := NewJSONLogger(writer, nil) + sampleInfoLogger, _ := NewJSONLogger(writer, nil, nil) sampleInfoLogger.Info(data.msg, data.keysValues...) logStr := buffer.String() @@ -94,7 +94,7 @@ func TestZapLoggerInfo(t *testing.T) { // TestZapLoggerEnabled test ZapLogger enabled func TestZapLoggerEnabled(t *testing.T) { - sampleInfoLogger, _ := NewJSONLogger(nil, nil) + sampleInfoLogger, _ := NewJSONLogger(nil, nil, nil) for i := 0; i < 11; i++ { if !sampleInfoLogger.V(i).Enabled() { t.Errorf("V(%d).Info should be enabled", i) @@ -111,7 +111,7 @@ func TestZapLoggerV(t *testing.T) { for i := 0; i < 11; i++ { var buffer bytes.Buffer writer := zapcore.AddSync(&buffer) - sampleInfoLogger, _ := NewJSONLogger(writer, nil) + sampleInfoLogger, _ := NewJSONLogger(writer, nil, nil) sampleInfoLogger.V(i).Info("test", "ns", "default", "podnum", 2, "time", time.Microsecond) logStr := buffer.String() var v, lineNo int @@ -138,7 +138,7 @@ func TestZapLoggerError(t *testing.T) { timeNow = func() time.Time { return time.Date(1970, time.January, 1, 0, 0, 0, 123, time.UTC) } - sampleInfoLogger, _ := NewJSONLogger(writer, nil) + sampleInfoLogger, _ := NewJSONLogger(writer, nil, nil) sampleInfoLogger.Error(fmt.Errorf("invalid namespace:%s", "default"), "wrong namespace", "ns", "default", "podnum", 2, "time", time.Microsecond) logStr := buffer.String() var ts float64 @@ -156,7 +156,7 @@ func TestZapLoggerError(t *testing.T) { func TestZapLoggerStreams(t *testing.T) { var infoBuffer, errorBuffer bytes.Buffer - log, _ := NewJSONLogger(zapcore.AddSync(&infoBuffer), zapcore.AddSync(&errorBuffer)) + log, _ := NewJSONLogger(zapcore.AddSync(&infoBuffer), zapcore.AddSync(&errorBuffer), nil) log.Error(fmt.Errorf("some error"), "failed") log.Info("hello world") diff --git a/staging/src/k8s.io/component-base/logs/json/klog_test.go b/staging/src/k8s.io/component-base/logs/json/klog_test.go index ae6441ff3cf..19c26590e6c 100644 --- a/staging/src/k8s.io/component-base/logs/json/klog_test.go +++ b/staging/src/k8s.io/component-base/logs/json/klog_test.go @@ -239,7 +239,7 @@ func TestKlogIntegration(t *testing.T) { t.Run(tc.name, func(t *testing.T) { var buffer bytes.Buffer writer := zapcore.AddSync(&buffer) - logger, _ := NewJSONLogger(writer, writer) + logger, _ := NewJSONLogger(writer, nil, nil) klog.SetLogger(logger) defer klog.ClearLogger() @@ -270,7 +270,7 @@ func TestKlogIntegration(t *testing.T) { func TestKlogV(t *testing.T) { var buffer testBuff writer := zapcore.AddSync(&buffer) - logger, _ := NewJSONLogger(writer, writer) + logger, _ := NewJSONLogger(writer, nil, nil) klog.SetLogger(logger) defer klog.ClearLogger() fs := flag.FlagSet{} From 8e2f03d3366a8d7fdc6f398a20ef16b2d3a36fe8 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 19 Nov 2021 20:46:00 +0100 Subject: [PATCH 2/7] logs: add benchmark The benchmark reads a JSON log file and measures how long it takes to re-encode it. The focus is on the encoding of message and values, therefore additional work (time stamping, caller, writing to file) gets avoided. --- staging/src/k8s.io/component-base/go.mod | 1 + .../component-base/logs/benchmark/README.md | 43 +++ .../logs/benchmark/benchmark_test.go | 108 +++++++ .../logs/benchmark/common_test.go | 79 +++++ .../logs/benchmark/data/container.log | 2 + .../logs/benchmark/data/error-value.log | 1 + .../logs/benchmark/data/error.log | 1 + .../logs/benchmark/data/simple.log | 1 + .../logs/benchmark/data/values.log | 1 + .../component-base/logs/benchmark/load.go | 269 ++++++++++++++++++ .../logs/benchmark/load_test.go | 221 ++++++++++++++ 11 files changed, 727 insertions(+) create mode 100644 staging/src/k8s.io/component-base/logs/benchmark/README.md create mode 100644 staging/src/k8s.io/component-base/logs/benchmark/benchmark_test.go create mode 100644 staging/src/k8s.io/component-base/logs/benchmark/common_test.go create mode 100644 staging/src/k8s.io/component-base/logs/benchmark/data/container.log create mode 100644 staging/src/k8s.io/component-base/logs/benchmark/data/error-value.log create mode 100644 staging/src/k8s.io/component-base/logs/benchmark/data/error.log create mode 100644 staging/src/k8s.io/component-base/logs/benchmark/data/simple.log create mode 100644 staging/src/k8s.io/component-base/logs/benchmark/data/values.log create mode 100644 staging/src/k8s.io/component-base/logs/benchmark/load.go create mode 100644 staging/src/k8s.io/component-base/logs/benchmark/load_test.go diff --git a/staging/src/k8s.io/component-base/go.mod b/staging/src/k8s.io/component-base/go.mod index f0e79049687..c4bc0d41a03 100644 --- a/staging/src/k8s.io/component-base/go.mod +++ b/staging/src/k8s.io/component-base/go.mod @@ -28,6 +28,7 @@ require ( golang.org/x/tools v0.1.8 // indirect google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2 // indirect gotest.tools/v3 v3.0.3 // indirect + k8s.io/api v0.0.0 k8s.io/apimachinery v0.0.0 k8s.io/client-go v0.0.0 k8s.io/klog/v2 v2.40.1 diff --git a/staging/src/k8s.io/component-base/logs/benchmark/README.md b/staging/src/k8s.io/component-base/logs/benchmark/README.md new file mode 100644 index 00000000000..d68a9a8deda --- /dev/null +++ b/staging/src/k8s.io/component-base/logs/benchmark/README.md @@ -0,0 +1,43 @@ +# Benchmarking logging + +Any major changes to the logging code, whether it is in Kubernetes or in klog, +must be benchmarked before and after the change. + +## Running the benchmark + +``` +$ go test -bench=. -test.benchmem -benchmem . +``` + +## Real log data + +The files under `data` define test cases for specific aspects of formatting. To +test with a log file that represents output under some kind of real load, copy +the log file into `data/.log` and run benchmarking as described +above. `-bench=BenchmarkLogging/` can be used +to benchmark just the new file. + +When using `data/v/.log`, formatting will be done at +that log level. Symlinks can be created to simulating writing of the same log +data at different levels. + +No such real data is included in the Kubernetes repo because of their size. +They can be found in the "artifacts" of this +https://testgrid.kubernetes.io/sig-instrumentation-tests#kind-json-logging-master +Prow job: +- `artifacts/logs/kind-control-plane/containers` +- `artifacts/logs/kind-*/kubelet.log` + +With sufficient credentials, `gsutil` can be used to download everything for a job with: +``` +gsutil -m cp -R gs://kubernetes-jenkins/logs/ci-kubernetes-kind-e2e-json-logging/ . +``` + +## Analyzing log data + +While loading a file, some statistics about it are collected. Those are shown +when running with: + +``` +$ go test -v -bench=. -test.benchmem -benchmem . +``` diff --git a/staging/src/k8s.io/component-base/logs/benchmark/benchmark_test.go b/staging/src/k8s.io/component-base/logs/benchmark/benchmark_test.go new file mode 100644 index 00000000000..ba15085410d --- /dev/null +++ b/staging/src/k8s.io/component-base/logs/benchmark/benchmark_test.go @@ -0,0 +1,108 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package benchmark + +import ( + "fmt" + "io/fs" + "path/filepath" + "regexp" + "strconv" + "strings" + "testing" + + "k8s.io/klog/v2" +) + +func BenchmarkLogging(b *testing.B) { + // Each "data/(v[0-9]/)?*.log" file is expected to contain JSON log + // messages. We generate one sub-benchmark for each file where logging + // is tested with the log level from the directory. Symlinks can be + // used to test the same file with different levels. + if err := filepath.Walk("data", func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + if !strings.HasSuffix(path, ".log") { + return nil + } + messages, stats, err := loadLog(path) + if err != nil { + return err + } + if info.Mode()&fs.ModeSymlink == 0 { + b.Log(path + "\n" + stats.String()) + } + b.Run(strings.TrimSuffix(strings.TrimPrefix(path, "data/"), ".log"), func(b *testing.B) { + // Take verbosity threshold from directory, if present. + vMatch := regexp.MustCompile(`/v(\d+)/`).FindStringSubmatch(path) + v := 0 + if vMatch != nil { + v, _ = strconv.Atoi(vMatch[1]) + } + fileSizes := map[string]int{} + b.Run("stats", func(b *testing.B) { + // Nothing to do. Use this for "go test -v + // -bench=BenchmarkLogging/.*/stats" to print + // just the statistics. + }) + b.Run("printf", func(b *testing.B) { + b.ResetTimer() + output = 0 + for i := 0; i < b.N; i++ { + for _, item := range messages { + if item.verbosity <= v { + printf(item) + } + } + } + fileSizes["printf"] = int(output) / b.N + }) + b.Run("structured", func(b *testing.B) { + b.ResetTimer() + output = 0 + for i := 0; i < b.N; i++ { + for _, item := range messages { + if item.verbosity <= v { + prints(item) + } + } + } + fileSizes["structured"] = int(output) / b.N + }) + b.Run("JSON", func(b *testing.B) { + klog.SetLogger(jsonLogger) + defer klog.ClearLogger() + b.ResetTimer() + output = 0 + for i := 0; i < b.N; i++ { + for _, item := range messages { + if item.verbosity <= v { + prints(item) + } + } + } + fileSizes["JSON"] = int(output) / b.N + }) + + b.Log(fmt.Sprintf("file sizes: %v\n", fileSizes)) + }) + return nil + }); err != nil { + b.Fatalf("reading 'data' directory: %v", err) + } +} diff --git a/staging/src/k8s.io/component-base/logs/benchmark/common_test.go b/staging/src/k8s.io/component-base/logs/benchmark/common_test.go new file mode 100644 index 00000000000..7fee16c76fe --- /dev/null +++ b/staging/src/k8s.io/component-base/logs/benchmark/common_test.go @@ -0,0 +1,79 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package benchmark + +import ( + "flag" + "io" + + "github.com/go-logr/logr" + "go.uber.org/zap/zapcore" + + logsjson "k8s.io/component-base/logs/json" + "k8s.io/klog/v2" +) + +func init() { + // Cause all klog output to be discarded with minimal overhead. + // We don't include time stamps and caller information. + klog.InitFlags(nil) + flag.Set("alsologtostderr", "false") + flag.Set("logtostderr", "false") + flag.Set("skip_headers", "true") + flag.Set("one_output", "true") + flag.Set("stderrthreshold", "FATAL") + klog.SetOutput(&output) +} + +type bytesWritten int64 + +func (b *bytesWritten) Write(data []byte) (int, error) { + l := len(data) + *b += bytesWritten(l) + return l, nil +} + +func (b *bytesWritten) Sync() error { + return nil +} + +var output bytesWritten +var jsonLogger = newJSONLogger(&output) + +func newJSONLogger(out io.Writer) logr.Logger { + encoderConfig := &zapcore.EncoderConfig{ + MessageKey: "msg", + } + logger, _ := logsjson.NewJSONLogger(zapcore.AddSync(out), nil, encoderConfig) + return logger +} + +func printf(item logMessage) { + if item.isError { + klog.Errorf("%s: %v %s", item.msg, item.err, item.kvs) + } else { + klog.Infof("%s: %v", item.msg, item.kvs) + } +} + +func prints(item logMessage) { + if item.isError { + klog.ErrorS(item.err, item.msg, item.kvs...) + } else { + klog.InfoS(item.msg, item.kvs...) + } +} diff --git a/staging/src/k8s.io/component-base/logs/benchmark/data/container.log b/staging/src/k8s.io/component-base/logs/benchmark/data/container.log new file mode 100644 index 00000000000..4f8a6676f73 --- /dev/null +++ b/staging/src/k8s.io/component-base/logs/benchmark/data/container.log @@ -0,0 +1,2 @@ +# This is a manually created message. See https://github.com/kubernetes/kubernetes/issues/106652 for the real one. +Nov 19 02:13:48 kind-worker2 kubelet[250]: {"ts":1637288028968.0125,"caller":"kuberuntime/kuberuntime_manager.go:902","msg":"Creating container in pod","v":0,"container":{"Name":"terminate-cmd-rpn","Image":"k8s.gcr.io/e2e-test-images/busybox:1.29-2","Command":["sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"],"TerminationMessagePath":"/dev/termination-log"}} diff --git a/staging/src/k8s.io/component-base/logs/benchmark/data/error-value.log b/staging/src/k8s.io/component-base/logs/benchmark/data/error-value.log new file mode 100644 index 00000000000..9c5bd4ced5d --- /dev/null +++ b/staging/src/k8s.io/component-base/logs/benchmark/data/error-value.log @@ -0,0 +1 @@ +{"v":0, "msg": "Pod status update", "err": "failed"} diff --git a/staging/src/k8s.io/component-base/logs/benchmark/data/error.log b/staging/src/k8s.io/component-base/logs/benchmark/data/error.log new file mode 100644 index 00000000000..114dd2a45ab --- /dev/null +++ b/staging/src/k8s.io/component-base/logs/benchmark/data/error.log @@ -0,0 +1 @@ +{"msg": "Pod status update", "err": "failed"} diff --git a/staging/src/k8s.io/component-base/logs/benchmark/data/simple.log b/staging/src/k8s.io/component-base/logs/benchmark/data/simple.log new file mode 100644 index 00000000000..2a4345b3bbe --- /dev/null +++ b/staging/src/k8s.io/component-base/logs/benchmark/data/simple.log @@ -0,0 +1 @@ +{"v": 0, "msg": "Pod status updated"} diff --git a/staging/src/k8s.io/component-base/logs/benchmark/data/values.log b/staging/src/k8s.io/component-base/logs/benchmark/data/values.log new file mode 100644 index 00000000000..f7c20c745f7 --- /dev/null +++ b/staging/src/k8s.io/component-base/logs/benchmark/data/values.log @@ -0,0 +1 @@ +{"v":0, "msg": "Example", "someValue": 1, "someString": "hello world", "pod": {"namespace": "system", "name": "kube-scheduler"}, "pv": {"name": "volume"}} diff --git a/staging/src/k8s.io/component-base/logs/benchmark/load.go b/staging/src/k8s.io/component-base/logs/benchmark/load.go new file mode 100644 index 00000000000..ccb5727b146 --- /dev/null +++ b/staging/src/k8s.io/component-base/logs/benchmark/load.go @@ -0,0 +1,269 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package benchmark + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "fmt" + "os" + "reflect" + "regexp" + "sort" + "strings" + "text/template" + + "k8s.io/api/core/v1" + "k8s.io/klog/v2" +) + +type logMessage struct { + msg string + verbosity int + err error + isError bool + kvs []interface{} +} + +const ( + stringArg = "string" + multiLineStringArg = "multiLineString" + objectStringArg = "objectString" + numberArg = "number" + krefArg = "kref" + otherArg = "other" + totalArg = "total" +) + +type logStats struct { + TotalLines, JsonLines, ErrorMessages int + + ArgCounts map[string]int + OtherLines []string + OtherArgs []interface{} + MultiLineArgs [][]string + ObjectTypes map[string]int +} + +var ( + logStatsTemplate = template.Must(template.New("format").Funcs(template.FuncMap{ + "percent": func(x, y int) string { + if y == 0 { + return "NA" + } + return fmt.Sprintf("%d%%", x*100/y) + }, + "sub": func(x, y int) int { + return x - y + }, + }).Parse(`Total number of lines: {{.TotalLines}} +Valid JSON messages: {{.JsonLines}} ({{percent .JsonLines .TotalLines}} of total lines) +Error messages: {{.ErrorMessages}} ({{percent .ErrorMessages .JsonLines}} of valid JSON messages) +Unrecognized lines: {{sub .TotalLines .JsonLines}} +{{range .OtherLines}} {{.}} +{{end}} +Args: + total: {{if .ArgCounts.total}}{{.ArgCounts.total}}{{else}}0{{end}}{{if .ArgCounts.string}} + strings: {{.ArgCounts.string}} ({{percent .ArgCounts.string .ArgCounts.total}}){{end}} {{if .ArgCounts.multiLineString}} + with line breaks: {{.ArgCounts.multiLineString}} ({{percent .ArgCounts.multiLineString .ArgCounts.total}} of all arguments) + {{range .MultiLineArgs}} ===== {{index . 0}} ===== +{{index . 1}} + +{{end}}{{end}}{{if .ArgCounts.objectString}} + with API objects: {{.ArgCounts.objectString}} ({{percent .ArgCounts.objectString .ArgCounts.total}} of all arguments) + types and their number of usage:{{range $key, $value := .ObjectTypes}} {{ $key }}:{{ $value }}{{end}}{{end}}{{if .ArgCounts.number}} + numbers: {{.ArgCounts.number}} ({{percent .ArgCounts.number .ArgCounts.total}}){{end}}{{if .ArgCounts.kref}} + ObjectRef: {{.ArgCounts.kref}} ({{percent .ArgCounts.kref .ArgCounts.total}}){{end}}{{if .ArgCounts.other}} + others: {{.ArgCounts.other}} ({{percent .ArgCounts.other .ArgCounts.total}}){{end}} +`)) +) + +// This produces too much output: +// {{range .OtherArgs}} {{.}} +// {{end}} + +// Doesn't work? +// Unrecognized lines: {{with $delta := sub .TotalLines .JsonLines}}{{$delta}} ({{percent $delta .TotalLines}} of total lines){{end}} + +func (s logStats) String() string { + var buffer bytes.Buffer + err := logStatsTemplate.Execute(&buffer, &s) + if err != nil { + return err.Error() + } + return buffer.String() +} + +func loadLog(path string) (messages []logMessage, stats logStats, err error) { + file, err := os.Open(path) + if err != nil { + return nil, logStats{}, err + } + defer file.Close() + + stats.ArgCounts = map[string]int{} + scanner := bufio.NewScanner(file) + for lineNo := 0; scanner.Scan(); lineNo++ { + line := scanner.Bytes() + msg, err := parseLine(line, &stats) + if err != nil { + stats.OtherLines = append(stats.OtherLines, fmt.Sprintf("%d: %s", lineNo, string(line))) + continue + } + messages = append(messages, msg) + } + + if err := scanner.Err(); err != nil { + return nil, logStats{}, fmt.Errorf("reading %s failed: %v", path, err) + } + + return +} + +// systemd prefix: +// Nov 19 02:08:51 kind-worker2 kubelet[250]: {"ts":1637287731687.8315,... +// +// kubectl (?) prefix: +// 2021-11-19T02:08:28.475825534Z stderr F {"ts": ... +var prefixRE = regexp.MustCompile(`^\w+ \d+ \S+ \S+ \S+: |\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z stderr . `) + +// String format for API structs from generated.pb.go. +// &Container{...} +var objectRE = regexp.MustCompile(`^&([a-zA-Z]*)\{`) + +func parseLine(line []byte, stats *logStats) (item logMessage, err error) { + stats.TotalLines++ + line = prefixRE.ReplaceAll(line, nil) + + content := map[string]interface{}{} + if err := json.Unmarshal(line, &content); err != nil { + return logMessage{}, fmt.Errorf("JSON parsing failed: %v", err) + } + stats.JsonLines++ + + kvs := map[string]interface{}{} + item.isError = true + for key, value := range content { + switch key { + case "v": + verbosity, ok := value.(float64) + if !ok { + return logMessage{}, fmt.Errorf("expected number for v, got: %T %v", value, value) + } + item.verbosity = int(verbosity) + item.isError = false + case "msg": + msg, ok := value.(string) + if !ok { + return logMessage{}, fmt.Errorf("expected string for msg, got: %T %v", value, value) + } + item.msg = msg + case "ts", "caller": + // ignore + case "err": + errStr, ok := value.(string) + if !ok { + return logMessage{}, fmt.Errorf("expected string for err, got: %T %v", value, value) + } + item.err = errors.New(errStr) + stats.ArgCounts[stringArg]++ + stats.ArgCounts[totalArg]++ + default: + if obj := toObject(value); obj != nil { + value = obj + } + switch value := value.(type) { + case string: + stats.ArgCounts[stringArg]++ + if strings.Contains(value, "\n") { + stats.ArgCounts[multiLineStringArg]++ + stats.MultiLineArgs = append(stats.MultiLineArgs, []string{key, value}) + } + match := objectRE.FindStringSubmatch(value) + if match != nil { + if stats.ObjectTypes == nil { + stats.ObjectTypes = map[string]int{} + } + stats.ArgCounts[objectStringArg]++ + stats.ObjectTypes[match[1]]++ + } + case float64: + stats.ArgCounts[numberArg]++ + case klog.ObjectRef: + stats.ArgCounts[krefArg]++ + default: + stats.ArgCounts[otherArg]++ + stats.OtherArgs = append(stats.OtherArgs, value) + } + stats.ArgCounts[totalArg]++ + kvs[key] = value + } + } + + // Sort by key. + var keys []string + for key := range kvs { + keys = append(keys, key) + } + sort.Strings(keys) + for _, key := range keys { + item.kvs = append(item.kvs, key, kvs[key]) + } + + if !item.isError && item.err != nil { + // Error is a normal key/value. + item.kvs = append(item.kvs, "err", item.err) + item.err = nil + } + if item.isError { + stats.ErrorMessages++ + } + return +} + +// This is a list of objects that might have been dumped. The simple ones must +// come first because unmarshaling will try one after the after and an +// ObjectRef would unmarshal fine into any of the others whereas any of the +// other types hopefully have enough extra fields that they won't fit (unknown +// fields are an error). +var objectTypes = []reflect.Type{ + reflect.TypeOf(klog.ObjectRef{}), + reflect.TypeOf(&v1.Pod{}), + reflect.TypeOf(&v1.Container{}), +} + +func toObject(value interface{}) interface{} { + data, ok := value.(map[string]interface{}) + if !ok { + return nil + } + jsonData, err := json.Marshal(data) + if err != nil { + return nil + } + for _, t := range objectTypes { + obj := reflect.New(t) + decoder := json.NewDecoder(bytes.NewBuffer(jsonData)) + decoder.DisallowUnknownFields() + if err := decoder.Decode(obj.Interface()); err == nil { + return reflect.Indirect(obj).Interface() + } + } + return nil +} diff --git a/staging/src/k8s.io/component-base/logs/benchmark/load_test.go b/staging/src/k8s.io/component-base/logs/benchmark/load_test.go new file mode 100644 index 00000000000..f6f3b33a5db --- /dev/null +++ b/staging/src/k8s.io/component-base/logs/benchmark/load_test.go @@ -0,0 +1,221 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package benchmark + +import ( + "bytes" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + + "k8s.io/api/core/v1" + "k8s.io/klog/v2" +) + +func TestData(t *testing.T) { + container := v1.Container{ + Command: []string{"sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"}, + Image: "k8s.gcr.io/e2e-test-images/busybox:1.29-2", + Name: "terminate-cmd-rpn", + TerminationMessagePath: "/dev/termination-log", + } + + testcases := map[string]struct { + messages []logMessage + printf, structured, json string + stats logStats + }{ + "data/simple.log": { + messages: []logMessage{ + { + msg: "Pod status updated", + }, + }, + printf: `Pod status updated: [] +`, + structured: `"Pod status updated" +`, + json: `{"msg":"Pod status updated","v":0} +`, + stats: logStats{ + TotalLines: 1, + JsonLines: 1, + ArgCounts: map[string]int{}, + }, + }, + "data/error.log": { + messages: []logMessage{ + { + msg: "Pod status update", + err: errors.New("failed"), + isError: true, + }, + }, + printf: `Pod status update: failed [] +`, + structured: `"Pod status update" err="failed" +`, + json: `{"msg":"Pod status update","err":"failed"} +`, + stats: logStats{ + TotalLines: 1, + JsonLines: 1, + ErrorMessages: 1, + ArgCounts: map[string]int{ + stringArg: 1, + totalArg: 1, + }, + }, + }, + "data/error-value.log": { + messages: []logMessage{ + { + msg: "Pod status update", + kvs: []interface{}{"err", errors.New("failed")}, + }, + }, + printf: `Pod status update: [err failed] +`, + structured: `"Pod status update" err="failed" +`, + json: `{"msg":"Pod status update","v":0,"err":"failed"} +`, + stats: logStats{ + TotalLines: 1, + JsonLines: 1, + ArgCounts: map[string]int{ + stringArg: 1, + totalArg: 1, + }, + }, + }, + "data/values.log": { + messages: []logMessage{ + { + msg: "Example", + kvs: []interface{}{ + "pod", klog.KRef("system", "kube-scheduler"), + "pv", klog.KRef("", "volume"), + "someString", "hello world", + "someValue", 1.0, + }, + }, + }, + printf: `Example: [pod system/kube-scheduler pv volume someString hello world someValue 1] +`, + structured: `"Example" pod="system/kube-scheduler" pv="volume" someString="hello world" someValue=1 +`, + json: `{"msg":"Example","v":0,"pod":{"name":"kube-scheduler","namespace":"system"},"pv":{"name":"volume"},"someString":"hello world","someValue":1} +`, + stats: logStats{ + TotalLines: 1, + JsonLines: 1, + ArgCounts: map[string]int{ + stringArg: 1, + krefArg: 2, + numberArg: 1, + totalArg: 4, + }, + }, + }, + "data/container.log": { + messages: []logMessage{ + { + msg: "Creating container in pod", + kvs: []interface{}{ + "container", &container, + }, + }, + }, + printf: `Creating container in pod: [container &Container{Name:terminate-cmd-rpn,Image:k8s.gcr.io/e2e-test-images/busybox:1.29-2,Command:[sh -c +f=/restart-count/restartCount +count=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'}) +if [ $count -eq 1 ]; then + exit 1 +fi +if [ $count -eq 2 ]; then + exit 0 +fi +while true; do sleep 1; done +],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,}] +`, + structured: `"Creating container in pod" container="&Container{Name:terminate-cmd-rpn,Image:k8s.gcr.io/e2e-test-images/busybox:1.29-2,Command:[sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,}" +`, + // This is what the output would look like with JSON object. Because of https://github.com/kubernetes/kubernetes/issues/106652 we get the string instead. + // json: `{"msg":"Creating container in pod","v":0,"container":{"name":"terminate-cmd-rpn","image":"k8s.gcr.io/e2e-test-images/busybox:1.29-2","command":["sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"],"resources":{},"terminationMessagePath":"/dev/termination-log"}} + // `, + json: `{"msg":"Creating container in pod","v":0,"container":"&Container{Name:terminate-cmd-rpn,Image:k8s.gcr.io/e2e-test-images/busybox:1.29-2,Command:[sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,}"} +`, + stats: logStats{ + TotalLines: 2, + JsonLines: 1, + ArgCounts: map[string]int{ + totalArg: 1, + otherArg: 1, + }, + OtherLines: []string{ + "0: # This is a manually created message. See https://github.com/kubernetes/kubernetes/issues/106652 for the real one.", + }, + OtherArgs: []interface{}{ + &container, + }, + }, + }, + } + + for path, expected := range testcases { + t.Run(path, func(t *testing.T) { + messages, stats, err := loadLog(path) + if err != nil { + t.Fatalf("unexpected load error: %v", err) + } + assert.Equal(t, expected.messages, messages) + assert.Equal(t, expected.stats, stats) + print := func(format func(item logMessage)) { + for _, item := range expected.messages { + format(item) + } + } + testBuffered := func(t *testing.T, expected string, format func(item logMessage)) { + var buffer bytes.Buffer + klog.SetOutput(&buffer) + defer klog.SetOutput(&output) + + print(format) + klog.Flush() + assert.Equal(t, expected, buffer.String()) + } + + t.Run("printf", func(t *testing.T) { + testBuffered(t, expected.printf, printf) + }) + t.Run("structured", func(t *testing.T) { + testBuffered(t, expected.structured, prints) + }) + t.Run("json", func(t *testing.T) { + var buffer bytes.Buffer + logger := newJSONLogger(&buffer) + klog.SetLogger(logger) + defer klog.ClearLogger() + print(prints) + klog.Flush() + assert.Equal(t, expected.json, buffer.String()) + }) + }) + } +} From 072859c967ebdb9f4c04c59d174bcd0ce66ae8f6 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 30 Nov 2021 14:53:46 +0100 Subject: [PATCH 3/7] logs: create separate test/integration directory The benchmark depends on k8s.io/api (for v1.Container). Such a dependency is not desirable for k8s.io/component-base/logs, even if it's just for testing. The solution is to create a separate directory where such a dependency isn't a problem. The alternative, a separate package with its own go.mod file under k8s.io/component-base/logs wouldd have been more complicated to maintain (yet another go.mod file and different whitelisted dependencies). --- go.mod | 2 ++ staging/src/k8s.io/component-base/go.mod | 1 - test/integration/logs/OWNERS | 11 +++++++++++ .../integration}/logs/benchmark/README.md | 0 .../integration}/logs/benchmark/benchmark_test.go | 0 .../integration}/logs/benchmark/common_test.go | 0 .../integration}/logs/benchmark/data/container.log | 0 .../integration}/logs/benchmark/data/error-value.log | 0 .../integration}/logs/benchmark/data/error.log | 0 .../integration}/logs/benchmark/data/simple.log | 0 .../integration}/logs/benchmark/data/values.log | 0 .../integration}/logs/benchmark/load.go | 0 .../integration}/logs/benchmark/load_test.go | 0 vendor/modules.txt | 2 ++ 14 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 test/integration/logs/OWNERS rename {staging/src/k8s.io/component-base => test/integration}/logs/benchmark/README.md (100%) rename {staging/src/k8s.io/component-base => test/integration}/logs/benchmark/benchmark_test.go (100%) rename {staging/src/k8s.io/component-base => test/integration}/logs/benchmark/common_test.go (100%) rename {staging/src/k8s.io/component-base => test/integration}/logs/benchmark/data/container.log (100%) rename {staging/src/k8s.io/component-base => test/integration}/logs/benchmark/data/error-value.log (100%) rename {staging/src/k8s.io/component-base => test/integration}/logs/benchmark/data/error.log (100%) rename {staging/src/k8s.io/component-base => test/integration}/logs/benchmark/data/simple.log (100%) rename {staging/src/k8s.io/component-base => test/integration}/logs/benchmark/data/values.log (100%) rename {staging/src/k8s.io/component-base => test/integration}/logs/benchmark/load.go (100%) rename {staging/src/k8s.io/component-base => test/integration}/logs/benchmark/load_test.go (100%) diff --git a/go.mod b/go.mod index 74611b0a9fd..c33fcc54f96 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/emicklei/go-restful v2.9.5+incompatible github.com/evanphx/json-patch v4.12.0+incompatible github.com/fsnotify/fsnotify v1.4.9 + github.com/go-logr/logr v1.2.0 github.com/go-ozzo/ozzo-validation v3.5.0+incompatible // indirect github.com/godbus/dbus/v5 v5.0.4 github.com/gogo/protobuf v1.3.2 @@ -82,6 +83,7 @@ require ( go.opentelemetry.io/otel/sdk v0.20.0 go.opentelemetry.io/otel/trace v0.20.0 go.opentelemetry.io/proto/otlp v0.7.0 + go.uber.org/zap v1.19.0 golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect golang.org/x/net v0.0.0-20211209124913-491a49abca63 diff --git a/staging/src/k8s.io/component-base/go.mod b/staging/src/k8s.io/component-base/go.mod index c4bc0d41a03..f0e79049687 100644 --- a/staging/src/k8s.io/component-base/go.mod +++ b/staging/src/k8s.io/component-base/go.mod @@ -28,7 +28,6 @@ require ( golang.org/x/tools v0.1.8 // indirect google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2 // indirect gotest.tools/v3 v3.0.3 // indirect - k8s.io/api v0.0.0 k8s.io/apimachinery v0.0.0 k8s.io/client-go v0.0.0 k8s.io/klog/v2 v2.40.1 diff --git a/test/integration/logs/OWNERS b/test/integration/logs/OWNERS new file mode 100644 index 00000000000..2d8babf9531 --- /dev/null +++ b/test/integration/logs/OWNERS @@ -0,0 +1,11 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: + - sig-instrumentation-approvers + - serathius + - pohly +reviewers: + - sig-instrumentation-reviewers +labels: + - sig/instrumentation + - wg/structured-logging diff --git a/staging/src/k8s.io/component-base/logs/benchmark/README.md b/test/integration/logs/benchmark/README.md similarity index 100% rename from staging/src/k8s.io/component-base/logs/benchmark/README.md rename to test/integration/logs/benchmark/README.md diff --git a/staging/src/k8s.io/component-base/logs/benchmark/benchmark_test.go b/test/integration/logs/benchmark/benchmark_test.go similarity index 100% rename from staging/src/k8s.io/component-base/logs/benchmark/benchmark_test.go rename to test/integration/logs/benchmark/benchmark_test.go diff --git a/staging/src/k8s.io/component-base/logs/benchmark/common_test.go b/test/integration/logs/benchmark/common_test.go similarity index 100% rename from staging/src/k8s.io/component-base/logs/benchmark/common_test.go rename to test/integration/logs/benchmark/common_test.go diff --git a/staging/src/k8s.io/component-base/logs/benchmark/data/container.log b/test/integration/logs/benchmark/data/container.log similarity index 100% rename from staging/src/k8s.io/component-base/logs/benchmark/data/container.log rename to test/integration/logs/benchmark/data/container.log diff --git a/staging/src/k8s.io/component-base/logs/benchmark/data/error-value.log b/test/integration/logs/benchmark/data/error-value.log similarity index 100% rename from staging/src/k8s.io/component-base/logs/benchmark/data/error-value.log rename to test/integration/logs/benchmark/data/error-value.log diff --git a/staging/src/k8s.io/component-base/logs/benchmark/data/error.log b/test/integration/logs/benchmark/data/error.log similarity index 100% rename from staging/src/k8s.io/component-base/logs/benchmark/data/error.log rename to test/integration/logs/benchmark/data/error.log diff --git a/staging/src/k8s.io/component-base/logs/benchmark/data/simple.log b/test/integration/logs/benchmark/data/simple.log similarity index 100% rename from staging/src/k8s.io/component-base/logs/benchmark/data/simple.log rename to test/integration/logs/benchmark/data/simple.log diff --git a/staging/src/k8s.io/component-base/logs/benchmark/data/values.log b/test/integration/logs/benchmark/data/values.log similarity index 100% rename from staging/src/k8s.io/component-base/logs/benchmark/data/values.log rename to test/integration/logs/benchmark/data/values.log diff --git a/staging/src/k8s.io/component-base/logs/benchmark/load.go b/test/integration/logs/benchmark/load.go similarity index 100% rename from staging/src/k8s.io/component-base/logs/benchmark/load.go rename to test/integration/logs/benchmark/load.go diff --git a/staging/src/k8s.io/component-base/logs/benchmark/load_test.go b/test/integration/logs/benchmark/load_test.go similarity index 100% rename from staging/src/k8s.io/component-base/logs/benchmark/load_test.go rename to test/integration/logs/benchmark/load_test.go diff --git a/vendor/modules.txt b/vendor/modules.txt index 1110cdda076..47d6ca2d61c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -254,6 +254,7 @@ github.com/fvbommel/sortorder # github.com/go-errors/errors v1.0.1 => github.com/go-errors/errors v1.0.1 github.com/go-errors/errors # github.com/go-logr/logr v1.2.0 => github.com/go-logr/logr v1.2.0 +## explicit github.com/go-logr/logr # github.com/go-logr/zapr v1.2.0 => github.com/go-logr/zapr v1.2.0 github.com/go-logr/zapr @@ -932,6 +933,7 @@ go.uber.org/atomic # go.uber.org/multierr v1.6.0 => go.uber.org/multierr v1.6.0 go.uber.org/multierr # go.uber.org/zap v1.19.0 => go.uber.org/zap v1.19.0 +## explicit go.uber.org/zap go.uber.org/zap/buffer go.uber.org/zap/internal/bufferpool From b8501fc10bf09755b3aa1cc91391c10d264ab694 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 30 Nov 2021 16:29:32 +0100 Subject: [PATCH 4/7] logs: work around logcheck logcheck complains: Additional arguments to ErrorS should always be Key Value pairs. Please check if there is any key or value missing. That check is intentional, but not applicable here. The check can be worked around by calling the functions through variables. --- test/integration/logs/benchmark/common_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/test/integration/logs/benchmark/common_test.go b/test/integration/logs/benchmark/common_test.go index 7fee16c76fe..b365c475823 100644 --- a/test/integration/logs/benchmark/common_test.go +++ b/test/integration/logs/benchmark/common_test.go @@ -70,10 +70,17 @@ func printf(item logMessage) { } } +// These variables are a workaround for logcheck complaining about the dynamic +// parameters. +var ( + errorS = klog.ErrorS + infoS = klog.InfoS +) + func prints(item logMessage) { if item.isError { - klog.ErrorS(item.err, item.msg, item.kvs...) + errorS(item.err, item.msg, item.kvs...) } else { - klog.InfoS(item.msg, item.kvs...) + infoS(item.msg, item.kvs...) } } From 9a867c555c647dfd4d6324343592703dd6777c69 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 16 Dec 2021 12:34:17 +0100 Subject: [PATCH 5/7] logs: benchmark write performance The recent regression https://github.com/kubernetes/kubernetes/issues/107033 shows that we need a way to automatically measure different logging configurations (structured text, JSON with and without split streams) under realistic conditions (time stamping, caller identification). System calls may affect the performance and thus writing into actual files is useful. A temp dir under /tmp (usually a tmpfs) is used, so the actual IO bandwidth shouldn't affect the outcome. The "normal" json.Factory code is used to construct the JSON logger when we have actual files that can be set as os.Stderr and os.Stdout, thus making this as realistic as possible. When discarding the output instead of writing it, the focus is more on the rest of the pipeline and changes there can be investigated more reliably. The benchmarks automatically gather "log entries per second" and "bytes per second", which is useful to know when considering requirements like the ones from https://github.com/kubernetes/kubernetes/issues/107029. --- .../logs/benchmark/benchmark_test.go | 187 +++++++++++++++++- .../integration/logs/benchmark/common_test.go | 2 + 2 files changed, 188 insertions(+), 1 deletion(-) diff --git a/test/integration/logs/benchmark/benchmark_test.go b/test/integration/logs/benchmark/benchmark_test.go index ba15085410d..2312a485c4c 100644 --- a/test/integration/logs/benchmark/benchmark_test.go +++ b/test/integration/logs/benchmark/benchmark_test.go @@ -17,18 +17,29 @@ limitations under the License. package benchmark import ( + "errors" + "flag" "fmt" + "io" "io/fs" + "os" + "path" "path/filepath" "regexp" "strconv" "strings" + "sync" "testing" + "time" + "github.com/go-logr/logr" + "go.uber.org/zap/zapcore" + "k8s.io/component-base/logs" + logsjson "k8s.io/component-base/logs/json" "k8s.io/klog/v2" ) -func BenchmarkLogging(b *testing.B) { +func BenchmarkEncoding(b *testing.B) { // Each "data/(v[0-9]/)?*.log" file is expected to contain JSON log // messages. We generate one sub-benchmark for each file where logging // is tested with the log level from the directory. Symlinks can be @@ -106,3 +117,177 @@ func BenchmarkLogging(b *testing.B) { b.Fatalf("reading 'data' directory: %v", err) } } + +type loadGeneratorConfig struct { + // Length of the message written in each log entry. + messageLength int + + // Percentage of error log entries written. + errorPercentage float64 + + // Number of concurrent goroutines that generate log entries. + workers int +} + +// BenchmarkWriting simulates writing of a stream which mixes info and error log +// messages at a certain ratio. In contrast to BenchmarkEncoding, this stresses +// the output handling and includes the usual additional information (caller, +// time stamp). +// +// See https://github.com/kubernetes/kubernetes/issues/107029 for the +// motivation. +func BenchmarkWriting(b *testing.B) { + flag.Set("skip_headers", "false") + defer flag.Set("skip_headers", "true") + + // This could be made configurable and/or we could benchmark different + // configurations automatically. + config := loadGeneratorConfig{ + messageLength: 300, + errorPercentage: 1.0, + workers: 100, + } + + benchmarkWriting(b, config) +} + +func benchmarkWriting(b *testing.B, config loadGeneratorConfig) { + b.Run("discard", func(b *testing.B) { + benchmarkOutputFormats(b, config, true) + }) + b.Run("tmp-files", func(b *testing.B) { + benchmarkOutputFormats(b, config, false) + }) +} + +func benchmarkOutputFormats(b *testing.B, config loadGeneratorConfig, discard bool) { + tmpDir := b.TempDir() + b.Run("structured", func(b *testing.B) { + var out *os.File + if !discard { + var err error + out, err = os.Create(path.Join(tmpDir, "all.log")) + if err != nil { + b.Fatal(err) + } + klog.SetOutput(out) + defer klog.SetOutput(&output) + } + generateOutput(b, config, out) + }) + b.Run("JSON", func(b *testing.B) { + var logger logr.Logger + var out1, out2 *os.File + if !discard { + var err error + out1, err = os.Create(path.Join(tmpDir, "stream-1.log")) + if err != nil { + b.Fatal(err) + } + defer out1.Close() + out2, err = os.Create(path.Join(tmpDir, "stream-2.log")) + if err != nil { + b.Fatal(err) + } + defer out2.Close() + } + b.Run("single-stream", func(b *testing.B) { + if discard { + logger, _ = logsjson.NewJSONLogger(zapcore.AddSync(&output), nil, nil) + } else { + stderr := os.Stderr + os.Stderr = out1 + defer func() { + os.Stderr = stderr + }() + options := logs.NewOptions() + logger, _ = logsjson.Factory{}.Create(options.Config.Options) + } + klog.SetLogger(logger) + defer klog.ClearLogger() + generateOutput(b, config, out1) + }) + + b.Run("split-stream", func(b *testing.B) { + if discard { + logger, _ = logsjson.NewJSONLogger(zapcore.AddSync(&output), zapcore.AddSync(&output), nil) + } else { + stdout, stderr := os.Stdout, os.Stderr + os.Stdout, os.Stderr = out1, out2 + defer func() { + os.Stdout, os.Stderr = stdout, stderr + }() + options := logs.NewOptions() + options.Config.Options.JSON.SplitStream = true + logger, _ = logsjson.Factory{}.Create(options.Config.Options) + } + klog.SetLogger(logger) + defer klog.ClearLogger() + generateOutput(b, config, out1, out2) + }) + }) +} + +func generateOutput(b *testing.B, config loadGeneratorConfig, files ...*os.File) { + msg := strings.Repeat("X", config.messageLength) + err := errors.New("fail") + start := time.Now() + + // Scale by 1000 because "go test -bench" starts with b.N == 1, which is very low. + n := b.N * 1000 + + b.ResetTimer() + var wg sync.WaitGroup + for i := 0; i < config.workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + acc := 0.0 + for i := 0; i < n; i++ { + if acc > 100 { + klog.ErrorS(err, msg, "key", "value") + acc -= 100 + } else { + klog.InfoS(msg, "key", "value") + } + acc += config.errorPercentage + } + }() + } + wg.Wait() + klog.Flush() + b.StopTimer() + + // Print some information about the result. + end := time.Now() + duration := end.Sub(start) + total := n * config.workers + b.Logf("Wrote %d log entries in %s -> %.1f/s", total, duration, float64(total)/duration.Seconds()) + for i, file := range files { + if file != nil { + pos, err := file.Seek(0, os.SEEK_END) + if err != nil { + b.Fatal(err) + } + if _, err := file.Seek(0, os.SEEK_SET); err != nil { + b.Fatal(err) + } + max := 50 + buffer := make([]byte, max) + actual, err := file.Read(buffer) + if err != nil { + if err != io.EOF { + b.Fatal(err) + } + buffer = nil + } + if actual == max { + buffer[max-3] = '.' + buffer[max-2] = '.' + buffer[max-1] = '.' + } + b.Logf(" %d bytes to file #%d -> %.1fMiB/s (starts with: %s)", pos, i, float64(pos)/duration.Seconds()/1024/1024, string(buffer)) + } + } +} diff --git a/test/integration/logs/benchmark/common_test.go b/test/integration/logs/benchmark/common_test.go index b365c475823..9adcf50b014 100644 --- a/test/integration/logs/benchmark/common_test.go +++ b/test/integration/logs/benchmark/common_test.go @@ -30,6 +30,8 @@ import ( func init() { // Cause all klog output to be discarded with minimal overhead. // We don't include time stamps and caller information. + // Individual tests can change that by calling flag.Set again, + // but should always restore this state here. klog.InitFlags(nil) flag.Set("alsologtostderr", "false") flag.Set("logtostderr", "false") From 25c646cbdd90bd890fcc3fc3b350737ecccbdc19 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 10 Jan 2022 16:07:30 +0100 Subject: [PATCH 6/7] json: never call fsync for stdout or stderr We don't need to worry about data loss once the data has been written to an output stream. Calling fsync unnecessarily has been the reason for performance issues in the past. --- .../k8s.io/component-base/logs/json/json.go | 26 +++++++++++++++++-- .../logs/benchmark/benchmark_test.go | 21 ++++++++------- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/staging/src/k8s.io/component-base/logs/json/json.go b/staging/src/k8s.io/component-base/logs/json/json.go index 1adada10cd4..a39a60dcd05 100644 --- a/staging/src/k8s.io/component-base/logs/json/json.go +++ b/staging/src/k8s.io/component-base/logs/json/json.go @@ -17,6 +17,7 @@ limitations under the License. package logs import ( + "io" "os" "time" @@ -83,9 +84,17 @@ type Factory struct{} var _ registry.LogFormatFactory = Factory{} func (f Factory) Create(options config.FormatOptions) (logr.Logger, func()) { - stderr := zapcore.Lock(os.Stderr) + // We intentionally avoid all os.File.Sync calls. Output is unbuffered, + // therefore we don't need to flush, and calling the underlying fsync + // would just slow down writing. + // + // The assumption is that logging only needs to ensure that data gets + // written to the output stream before the process terminates, but + // doesn't need to worry about data not being written because of a + // system crash or powerloss. + stderr := zapcore.Lock(AddNopSync(os.Stderr)) if options.JSON.SplitStream { - stdout := zapcore.Lock(os.Stdout) + stdout := zapcore.Lock(AddNopSync(os.Stdout)) size := options.JSON.InfoBufferSize.Value() if size > 0 { // Prevent integer overflow. @@ -103,3 +112,16 @@ func (f Factory) Create(options config.FormatOptions) (logr.Logger, func()) { // Write info messages and errors to stderr to prevent mixing with normal program output. return NewJSONLogger(stderr, nil, nil) } + +// AddNoSync adds a NOP Sync implementation. +func AddNopSync(writer io.Writer) zapcore.WriteSyncer { + return nopSync{Writer: writer} +} + +type nopSync struct { + io.Writer +} + +func (f nopSync) Sync() error { + return nil +} diff --git a/test/integration/logs/benchmark/benchmark_test.go b/test/integration/logs/benchmark/benchmark_test.go index 2312a485c4c..e810e88f3a3 100644 --- a/test/integration/logs/benchmark/benchmark_test.go +++ b/test/integration/logs/benchmark/benchmark_test.go @@ -33,7 +33,6 @@ import ( "time" "github.com/go-logr/logr" - "go.uber.org/zap/zapcore" "k8s.io/component-base/logs" logsjson "k8s.io/component-base/logs/json" "k8s.io/klog/v2" @@ -173,10 +172,11 @@ func benchmarkOutputFormats(b *testing.B, config loadGeneratorConfig, discard bo klog.SetOutput(out) defer klog.SetOutput(&output) } - generateOutput(b, config, out) + generateOutput(b, config, nil, out) }) b.Run("JSON", func(b *testing.B) { var logger logr.Logger + var flush func() var out1, out2 *os.File if !discard { var err error @@ -193,7 +193,7 @@ func benchmarkOutputFormats(b *testing.B, config loadGeneratorConfig, discard bo } b.Run("single-stream", func(b *testing.B) { if discard { - logger, _ = logsjson.NewJSONLogger(zapcore.AddSync(&output), nil, nil) + logger, flush = logsjson.NewJSONLogger(logsjson.AddNopSync(&output), nil, nil) } else { stderr := os.Stderr os.Stderr = out1 @@ -201,16 +201,16 @@ func benchmarkOutputFormats(b *testing.B, config loadGeneratorConfig, discard bo os.Stderr = stderr }() options := logs.NewOptions() - logger, _ = logsjson.Factory{}.Create(options.Config.Options) + logger, flush = logsjson.Factory{}.Create(options.Config.Options) } klog.SetLogger(logger) defer klog.ClearLogger() - generateOutput(b, config, out1) + generateOutput(b, config, flush, out1) }) b.Run("split-stream", func(b *testing.B) { if discard { - logger, _ = logsjson.NewJSONLogger(zapcore.AddSync(&output), zapcore.AddSync(&output), nil) + logger, flush = logsjson.NewJSONLogger(logsjson.AddNopSync(&output), logsjson.AddNopSync(&output), nil) } else { stdout, stderr := os.Stdout, os.Stderr os.Stdout, os.Stderr = out1, out2 @@ -219,16 +219,16 @@ func benchmarkOutputFormats(b *testing.B, config loadGeneratorConfig, discard bo }() options := logs.NewOptions() options.Config.Options.JSON.SplitStream = true - logger, _ = logsjson.Factory{}.Create(options.Config.Options) + logger, flush = logsjson.Factory{}.Create(options.Config.Options) } klog.SetLogger(logger) defer klog.ClearLogger() - generateOutput(b, config, out1, out2) + generateOutput(b, config, flush, out1, out2) }) }) } -func generateOutput(b *testing.B, config loadGeneratorConfig, files ...*os.File) { +func generateOutput(b *testing.B, config loadGeneratorConfig, flush func(), files ...*os.File) { msg := strings.Repeat("X", config.messageLength) err := errors.New("fail") start := time.Now() @@ -257,6 +257,9 @@ func generateOutput(b *testing.B, config loadGeneratorConfig, files ...*os.File) } wg.Wait() klog.Flush() + if flush != nil { + flush() + } b.StopTimer() // Print some information about the result. From a5a241e0df93bc3b1c0d6fda92b3f1b04a151df2 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 11 Jan 2022 09:57:03 +0100 Subject: [PATCH 7/7] logs: update expected output for multi-line test case The multiline support has been merged while this benchmark was written. We now get the output that we want, with line breaks. --- test/integration/logs/benchmark/load_test.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/test/integration/logs/benchmark/load_test.go b/test/integration/logs/benchmark/load_test.go index f6f3b33a5db..496377e0858 100644 --- a/test/integration/logs/benchmark/load_test.go +++ b/test/integration/logs/benchmark/load_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/assert" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" ) @@ -154,7 +154,19 @@ fi while true; do sleep 1; done ],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,}] `, - structured: `"Creating container in pod" container="&Container{Name:terminate-cmd-rpn,Image:k8s.gcr.io/e2e-test-images/busybox:1.29-2,Command:[sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,}" + structured: `"Creating container in pod" container=< + &Container{Name:terminate-cmd-rpn,Image:k8s.gcr.io/e2e-test-images/busybox:1.29-2,Command:[sh -c + f=/restart-count/restartCount + count=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'}) + if [ $count -eq 1 ]; then + exit 1 + fi + if [ $count -eq 2 ]; then + exit 0 + fi + while true; do sleep 1; done + ],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,} + > `, // This is what the output would look like with JSON object. Because of https://github.com/kubernetes/kubernetes/issues/106652 we get the string instead. // json: `{"msg":"Creating container in pod","v":0,"container":{"name":"terminate-cmd-rpn","image":"k8s.gcr.io/e2e-test-images/busybox:1.29-2","command":["sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"],"resources":{},"terminationMessagePath":"/dev/termination-log"}}