diff --git a/go.mod b/go.mod index 113d639a1a4..c33fcc54f96 100644 --- a/go.mod +++ b/go.mod @@ -83,6 +83,7 @@ require ( go.opentelemetry.io/otel/sdk v0.20.0 go.opentelemetry.io/otel/trace v0.20.0 go.opentelemetry.io/proto/otlp v0.7.0 + go.uber.org/zap v1.19.0 golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect golang.org/x/net v0.0.0-20211209124913-491a49abca63 diff --git a/staging/src/k8s.io/component-base/logs/json/json.go b/staging/src/k8s.io/component-base/logs/json/json.go index 32e66007a48..a39a60dcd05 100644 --- a/staging/src/k8s.io/component-base/logs/json/json.go +++ b/staging/src/k8s.io/component-base/logs/json/json.go @@ -17,6 +17,7 @@ limitations under the License. package logs import ( + "io" "os" "time" @@ -36,8 +37,20 @@ var ( // NewJSONLogger creates a new json logr.Logger and its associated // flush function. The separate error stream is optional and may be nil. -func NewJSONLogger(infoStream, errorStream zapcore.WriteSyncer) (logr.Logger, func()) { - encoder := zapcore.NewJSONEncoder(encoderConfig) +// The encoder config is also optional. +func NewJSONLogger(infoStream, errorStream zapcore.WriteSyncer, encoderConfig *zapcore.EncoderConfig) (logr.Logger, func()) { + if encoderConfig == nil { + encoderConfig = &zapcore.EncoderConfig{ + MessageKey: "msg", + CallerKey: "caller", + TimeKey: "ts", + EncodeTime: epochMillisTimeEncoder, + EncodeDuration: zapcore.StringDurationEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + } + } + + encoder := zapcore.NewJSONEncoder(*encoderConfig) var core zapcore.Core if errorStream == nil { core = zapcore.NewCore(encoder, infoStream, zapcore.Level(-127)) @@ -59,15 +72,6 @@ func NewJSONLogger(infoStream, errorStream zapcore.WriteSyncer) (logr.Logger, fu } } -var encoderConfig = zapcore.EncoderConfig{ - MessageKey: "msg", - CallerKey: "caller", - TimeKey: "ts", - EncodeTime: epochMillisTimeEncoder, - EncodeDuration: zapcore.StringDurationEncoder, - EncodeCaller: zapcore.ShortCallerEncoder, -} - func epochMillisTimeEncoder(_ time.Time, enc zapcore.PrimitiveArrayEncoder) { nanos := timeNow().UnixNano() millis := float64(nanos) / float64(time.Millisecond) @@ -80,9 +84,17 @@ type Factory struct{} var _ registry.LogFormatFactory = Factory{} func (f Factory) Create(options config.FormatOptions) (logr.Logger, func()) { - stderr := zapcore.Lock(os.Stderr) + // We intentionally avoid all os.File.Sync calls. Output is unbuffered, + // therefore we don't need to flush, and calling the underlying fsync + // would just slow down writing. + // + // The assumption is that logging only needs to ensure that data gets + // written to the output stream before the process terminates, but + // doesn't need to worry about data not being written because of a + // system crash or powerloss. + stderr := zapcore.Lock(AddNopSync(os.Stderr)) if options.JSON.SplitStream { - stdout := zapcore.Lock(os.Stdout) + stdout := zapcore.Lock(AddNopSync(os.Stdout)) size := options.JSON.InfoBufferSize.Value() if size > 0 { // Prevent integer overflow. @@ -95,8 +107,21 @@ func (f Factory) Create(options config.FormatOptions) (logr.Logger, func()) { } } // stdout for info messages, stderr for errors. - return NewJSONLogger(stdout, stderr) + return NewJSONLogger(stdout, stderr, nil) } // Write info messages and errors to stderr to prevent mixing with normal program output. - return NewJSONLogger(stderr, nil) + return NewJSONLogger(stderr, nil, nil) +} + +// AddNoSync adds a NOP Sync implementation. +func AddNopSync(writer io.Writer) zapcore.WriteSyncer { + return nopSync{Writer: writer} +} + +type nopSync struct { + io.Writer +} + +func (f nopSync) Sync() error { + return nil } diff --git a/staging/src/k8s.io/component-base/logs/json/json_benchmark_test.go b/staging/src/k8s.io/component-base/logs/json/json_benchmark_test.go index 7d894896bcd..ca4d6949090 100644 --- a/staging/src/k8s.io/component-base/logs/json/json_benchmark_test.go +++ b/staging/src/k8s.io/component-base/logs/json/json_benchmark_test.go @@ -26,7 +26,7 @@ import ( var writer = zapcore.AddSync(&writeSyncer{}) func BenchmarkInfoLoggerInfo(b *testing.B) { - logger, _ := NewJSONLogger(writer, writer) + logger, _ := NewJSONLogger(writer, nil, nil) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { @@ -55,7 +55,7 @@ func BenchmarkInfoLoggerInfo(b *testing.B) { } func BenchmarkZapLoggerError(b *testing.B) { - logger, _ := NewJSONLogger(writer, writer) + logger, _ := NewJSONLogger(writer, nil, nil) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { @@ -85,7 +85,7 @@ func BenchmarkZapLoggerError(b *testing.B) { } func BenchmarkZapLoggerV(b *testing.B) { - logger, _ := NewJSONLogger(writer, writer) + logger, _ := NewJSONLogger(writer, nil, nil) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { diff --git a/staging/src/k8s.io/component-base/logs/json/json_test.go b/staging/src/k8s.io/component-base/logs/json/json_test.go index 75edaad0986..a360d10c934 100644 --- a/staging/src/k8s.io/component-base/logs/json/json_test.go +++ b/staging/src/k8s.io/component-base/logs/json/json_test.go @@ -64,7 +64,7 @@ func TestZapLoggerInfo(t *testing.T) { for _, data := range testDataInfo { var buffer bytes.Buffer writer := zapcore.AddSync(&buffer) - sampleInfoLogger, _ := NewJSONLogger(writer, nil) + sampleInfoLogger, _ := NewJSONLogger(writer, nil, nil) sampleInfoLogger.Info(data.msg, data.keysValues...) logStr := buffer.String() @@ -94,7 +94,7 @@ func TestZapLoggerInfo(t *testing.T) { // TestZapLoggerEnabled test ZapLogger enabled func TestZapLoggerEnabled(t *testing.T) { - sampleInfoLogger, _ := NewJSONLogger(nil, nil) + sampleInfoLogger, _ := NewJSONLogger(nil, nil, nil) for i := 0; i < 11; i++ { if !sampleInfoLogger.V(i).Enabled() { t.Errorf("V(%d).Info should be enabled", i) @@ -111,7 +111,7 @@ func TestZapLoggerV(t *testing.T) { for i := 0; i < 11; i++ { var buffer bytes.Buffer writer := zapcore.AddSync(&buffer) - sampleInfoLogger, _ := NewJSONLogger(writer, nil) + sampleInfoLogger, _ := NewJSONLogger(writer, nil, nil) sampleInfoLogger.V(i).Info("test", "ns", "default", "podnum", 2, "time", time.Microsecond) logStr := buffer.String() var v, lineNo int @@ -138,7 +138,7 @@ func TestZapLoggerError(t *testing.T) { timeNow = func() time.Time { return time.Date(1970, time.January, 1, 0, 0, 0, 123, time.UTC) } - sampleInfoLogger, _ := NewJSONLogger(writer, nil) + sampleInfoLogger, _ := NewJSONLogger(writer, nil, nil) sampleInfoLogger.Error(fmt.Errorf("invalid namespace:%s", "default"), "wrong namespace", "ns", "default", "podnum", 2, "time", time.Microsecond) logStr := buffer.String() var ts float64 @@ -156,7 +156,7 @@ func TestZapLoggerError(t *testing.T) { func TestZapLoggerStreams(t *testing.T) { var infoBuffer, errorBuffer bytes.Buffer - log, _ := NewJSONLogger(zapcore.AddSync(&infoBuffer), zapcore.AddSync(&errorBuffer)) + log, _ := NewJSONLogger(zapcore.AddSync(&infoBuffer), zapcore.AddSync(&errorBuffer), nil) log.Error(fmt.Errorf("some error"), "failed") log.Info("hello world") diff --git a/staging/src/k8s.io/component-base/logs/json/klog_test.go b/staging/src/k8s.io/component-base/logs/json/klog_test.go index ae6441ff3cf..19c26590e6c 100644 --- a/staging/src/k8s.io/component-base/logs/json/klog_test.go +++ b/staging/src/k8s.io/component-base/logs/json/klog_test.go @@ -239,7 +239,7 @@ func TestKlogIntegration(t *testing.T) { t.Run(tc.name, func(t *testing.T) { var buffer bytes.Buffer writer := zapcore.AddSync(&buffer) - logger, _ := NewJSONLogger(writer, writer) + logger, _ := NewJSONLogger(writer, nil, nil) klog.SetLogger(logger) defer klog.ClearLogger() @@ -270,7 +270,7 @@ func TestKlogIntegration(t *testing.T) { func TestKlogV(t *testing.T) { var buffer testBuff writer := zapcore.AddSync(&buffer) - logger, _ := NewJSONLogger(writer, writer) + logger, _ := NewJSONLogger(writer, nil, nil) klog.SetLogger(logger) defer klog.ClearLogger() fs := flag.FlagSet{} diff --git a/test/integration/logs/OWNERS b/test/integration/logs/OWNERS new file mode 100644 index 00000000000..2d8babf9531 --- /dev/null +++ b/test/integration/logs/OWNERS @@ -0,0 +1,11 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: + - sig-instrumentation-approvers + - serathius + - pohly +reviewers: + - sig-instrumentation-reviewers +labels: + - sig/instrumentation + - wg/structured-logging diff --git a/test/integration/logs/benchmark/README.md b/test/integration/logs/benchmark/README.md new file mode 100644 index 00000000000..d68a9a8deda --- /dev/null +++ b/test/integration/logs/benchmark/README.md @@ -0,0 +1,43 @@ +# Benchmarking logging + +Any major changes to the logging code, whether it is in Kubernetes or in klog, +must be benchmarked before and after the change. + +## Running the benchmark + +``` +$ go test -bench=. -test.benchmem -benchmem . +``` + +## Real log data + +The files under `data` define test cases for specific aspects of formatting. To +test with a log file that represents output under some kind of real load, copy +the log file into `data/.log` and run benchmarking as described +above. `-bench=BenchmarkLogging/` can be used +to benchmark just the new file. + +When using `data/v/.log`, formatting will be done at +that log level. Symlinks can be created to simulating writing of the same log +data at different levels. + +No such real data is included in the Kubernetes repo because of their size. +They can be found in the "artifacts" of this +https://testgrid.kubernetes.io/sig-instrumentation-tests#kind-json-logging-master +Prow job: +- `artifacts/logs/kind-control-plane/containers` +- `artifacts/logs/kind-*/kubelet.log` + +With sufficient credentials, `gsutil` can be used to download everything for a job with: +``` +gsutil -m cp -R gs://kubernetes-jenkins/logs/ci-kubernetes-kind-e2e-json-logging/ . +``` + +## Analyzing log data + +While loading a file, some statistics about it are collected. Those are shown +when running with: + +``` +$ go test -v -bench=. -test.benchmem -benchmem . +``` diff --git a/test/integration/logs/benchmark/benchmark_test.go b/test/integration/logs/benchmark/benchmark_test.go new file mode 100644 index 00000000000..e810e88f3a3 --- /dev/null +++ b/test/integration/logs/benchmark/benchmark_test.go @@ -0,0 +1,296 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package benchmark + +import ( + "errors" + "flag" + "fmt" + "io" + "io/fs" + "os" + "path" + "path/filepath" + "regexp" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/go-logr/logr" + "k8s.io/component-base/logs" + logsjson "k8s.io/component-base/logs/json" + "k8s.io/klog/v2" +) + +func BenchmarkEncoding(b *testing.B) { + // Each "data/(v[0-9]/)?*.log" file is expected to contain JSON log + // messages. We generate one sub-benchmark for each file where logging + // is tested with the log level from the directory. Symlinks can be + // used to test the same file with different levels. + if err := filepath.Walk("data", func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + if !strings.HasSuffix(path, ".log") { + return nil + } + messages, stats, err := loadLog(path) + if err != nil { + return err + } + if info.Mode()&fs.ModeSymlink == 0 { + b.Log(path + "\n" + stats.String()) + } + b.Run(strings.TrimSuffix(strings.TrimPrefix(path, "data/"), ".log"), func(b *testing.B) { + // Take verbosity threshold from directory, if present. + vMatch := regexp.MustCompile(`/v(\d+)/`).FindStringSubmatch(path) + v := 0 + if vMatch != nil { + v, _ = strconv.Atoi(vMatch[1]) + } + fileSizes := map[string]int{} + b.Run("stats", func(b *testing.B) { + // Nothing to do. Use this for "go test -v + // -bench=BenchmarkLogging/.*/stats" to print + // just the statistics. + }) + b.Run("printf", func(b *testing.B) { + b.ResetTimer() + output = 0 + for i := 0; i < b.N; i++ { + for _, item := range messages { + if item.verbosity <= v { + printf(item) + } + } + } + fileSizes["printf"] = int(output) / b.N + }) + b.Run("structured", func(b *testing.B) { + b.ResetTimer() + output = 0 + for i := 0; i < b.N; i++ { + for _, item := range messages { + if item.verbosity <= v { + prints(item) + } + } + } + fileSizes["structured"] = int(output) / b.N + }) + b.Run("JSON", func(b *testing.B) { + klog.SetLogger(jsonLogger) + defer klog.ClearLogger() + b.ResetTimer() + output = 0 + for i := 0; i < b.N; i++ { + for _, item := range messages { + if item.verbosity <= v { + prints(item) + } + } + } + fileSizes["JSON"] = int(output) / b.N + }) + + b.Log(fmt.Sprintf("file sizes: %v\n", fileSizes)) + }) + return nil + }); err != nil { + b.Fatalf("reading 'data' directory: %v", err) + } +} + +type loadGeneratorConfig struct { + // Length of the message written in each log entry. + messageLength int + + // Percentage of error log entries written. + errorPercentage float64 + + // Number of concurrent goroutines that generate log entries. + workers int +} + +// BenchmarkWriting simulates writing of a stream which mixes info and error log +// messages at a certain ratio. In contrast to BenchmarkEncoding, this stresses +// the output handling and includes the usual additional information (caller, +// time stamp). +// +// See https://github.com/kubernetes/kubernetes/issues/107029 for the +// motivation. +func BenchmarkWriting(b *testing.B) { + flag.Set("skip_headers", "false") + defer flag.Set("skip_headers", "true") + + // This could be made configurable and/or we could benchmark different + // configurations automatically. + config := loadGeneratorConfig{ + messageLength: 300, + errorPercentage: 1.0, + workers: 100, + } + + benchmarkWriting(b, config) +} + +func benchmarkWriting(b *testing.B, config loadGeneratorConfig) { + b.Run("discard", func(b *testing.B) { + benchmarkOutputFormats(b, config, true) + }) + b.Run("tmp-files", func(b *testing.B) { + benchmarkOutputFormats(b, config, false) + }) +} + +func benchmarkOutputFormats(b *testing.B, config loadGeneratorConfig, discard bool) { + tmpDir := b.TempDir() + b.Run("structured", func(b *testing.B) { + var out *os.File + if !discard { + var err error + out, err = os.Create(path.Join(tmpDir, "all.log")) + if err != nil { + b.Fatal(err) + } + klog.SetOutput(out) + defer klog.SetOutput(&output) + } + generateOutput(b, config, nil, out) + }) + b.Run("JSON", func(b *testing.B) { + var logger logr.Logger + var flush func() + var out1, out2 *os.File + if !discard { + var err error + out1, err = os.Create(path.Join(tmpDir, "stream-1.log")) + if err != nil { + b.Fatal(err) + } + defer out1.Close() + out2, err = os.Create(path.Join(tmpDir, "stream-2.log")) + if err != nil { + b.Fatal(err) + } + defer out2.Close() + } + b.Run("single-stream", func(b *testing.B) { + if discard { + logger, flush = logsjson.NewJSONLogger(logsjson.AddNopSync(&output), nil, nil) + } else { + stderr := os.Stderr + os.Stderr = out1 + defer func() { + os.Stderr = stderr + }() + options := logs.NewOptions() + logger, flush = logsjson.Factory{}.Create(options.Config.Options) + } + klog.SetLogger(logger) + defer klog.ClearLogger() + generateOutput(b, config, flush, out1) + }) + + b.Run("split-stream", func(b *testing.B) { + if discard { + logger, flush = logsjson.NewJSONLogger(logsjson.AddNopSync(&output), logsjson.AddNopSync(&output), nil) + } else { + stdout, stderr := os.Stdout, os.Stderr + os.Stdout, os.Stderr = out1, out2 + defer func() { + os.Stdout, os.Stderr = stdout, stderr + }() + options := logs.NewOptions() + options.Config.Options.JSON.SplitStream = true + logger, flush = logsjson.Factory{}.Create(options.Config.Options) + } + klog.SetLogger(logger) + defer klog.ClearLogger() + generateOutput(b, config, flush, out1, out2) + }) + }) +} + +func generateOutput(b *testing.B, config loadGeneratorConfig, flush func(), files ...*os.File) { + msg := strings.Repeat("X", config.messageLength) + err := errors.New("fail") + start := time.Now() + + // Scale by 1000 because "go test -bench" starts with b.N == 1, which is very low. + n := b.N * 1000 + + b.ResetTimer() + var wg sync.WaitGroup + for i := 0; i < config.workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + acc := 0.0 + for i := 0; i < n; i++ { + if acc > 100 { + klog.ErrorS(err, msg, "key", "value") + acc -= 100 + } else { + klog.InfoS(msg, "key", "value") + } + acc += config.errorPercentage + } + }() + } + wg.Wait() + klog.Flush() + if flush != nil { + flush() + } + b.StopTimer() + + // Print some information about the result. + end := time.Now() + duration := end.Sub(start) + total := n * config.workers + b.Logf("Wrote %d log entries in %s -> %.1f/s", total, duration, float64(total)/duration.Seconds()) + for i, file := range files { + if file != nil { + pos, err := file.Seek(0, os.SEEK_END) + if err != nil { + b.Fatal(err) + } + if _, err := file.Seek(0, os.SEEK_SET); err != nil { + b.Fatal(err) + } + max := 50 + buffer := make([]byte, max) + actual, err := file.Read(buffer) + if err != nil { + if err != io.EOF { + b.Fatal(err) + } + buffer = nil + } + if actual == max { + buffer[max-3] = '.' + buffer[max-2] = '.' + buffer[max-1] = '.' + } + b.Logf(" %d bytes to file #%d -> %.1fMiB/s (starts with: %s)", pos, i, float64(pos)/duration.Seconds()/1024/1024, string(buffer)) + } + } +} diff --git a/test/integration/logs/benchmark/common_test.go b/test/integration/logs/benchmark/common_test.go new file mode 100644 index 00000000000..9adcf50b014 --- /dev/null +++ b/test/integration/logs/benchmark/common_test.go @@ -0,0 +1,88 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package benchmark + +import ( + "flag" + "io" + + "github.com/go-logr/logr" + "go.uber.org/zap/zapcore" + + logsjson "k8s.io/component-base/logs/json" + "k8s.io/klog/v2" +) + +func init() { + // Cause all klog output to be discarded with minimal overhead. + // We don't include time stamps and caller information. + // Individual tests can change that by calling flag.Set again, + // but should always restore this state here. + klog.InitFlags(nil) + flag.Set("alsologtostderr", "false") + flag.Set("logtostderr", "false") + flag.Set("skip_headers", "true") + flag.Set("one_output", "true") + flag.Set("stderrthreshold", "FATAL") + klog.SetOutput(&output) +} + +type bytesWritten int64 + +func (b *bytesWritten) Write(data []byte) (int, error) { + l := len(data) + *b += bytesWritten(l) + return l, nil +} + +func (b *bytesWritten) Sync() error { + return nil +} + +var output bytesWritten +var jsonLogger = newJSONLogger(&output) + +func newJSONLogger(out io.Writer) logr.Logger { + encoderConfig := &zapcore.EncoderConfig{ + MessageKey: "msg", + } + logger, _ := logsjson.NewJSONLogger(zapcore.AddSync(out), nil, encoderConfig) + return logger +} + +func printf(item logMessage) { + if item.isError { + klog.Errorf("%s: %v %s", item.msg, item.err, item.kvs) + } else { + klog.Infof("%s: %v", item.msg, item.kvs) + } +} + +// These variables are a workaround for logcheck complaining about the dynamic +// parameters. +var ( + errorS = klog.ErrorS + infoS = klog.InfoS +) + +func prints(item logMessage) { + if item.isError { + errorS(item.err, item.msg, item.kvs...) + } else { + infoS(item.msg, item.kvs...) + } +} diff --git a/test/integration/logs/benchmark/data/container.log b/test/integration/logs/benchmark/data/container.log new file mode 100644 index 00000000000..4f8a6676f73 --- /dev/null +++ b/test/integration/logs/benchmark/data/container.log @@ -0,0 +1,2 @@ +# This is a manually created message. See https://github.com/kubernetes/kubernetes/issues/106652 for the real one. +Nov 19 02:13:48 kind-worker2 kubelet[250]: {"ts":1637288028968.0125,"caller":"kuberuntime/kuberuntime_manager.go:902","msg":"Creating container in pod","v":0,"container":{"Name":"terminate-cmd-rpn","Image":"k8s.gcr.io/e2e-test-images/busybox:1.29-2","Command":["sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"],"TerminationMessagePath":"/dev/termination-log"}} diff --git a/test/integration/logs/benchmark/data/error-value.log b/test/integration/logs/benchmark/data/error-value.log new file mode 100644 index 00000000000..9c5bd4ced5d --- /dev/null +++ b/test/integration/logs/benchmark/data/error-value.log @@ -0,0 +1 @@ +{"v":0, "msg": "Pod status update", "err": "failed"} diff --git a/test/integration/logs/benchmark/data/error.log b/test/integration/logs/benchmark/data/error.log new file mode 100644 index 00000000000..114dd2a45ab --- /dev/null +++ b/test/integration/logs/benchmark/data/error.log @@ -0,0 +1 @@ +{"msg": "Pod status update", "err": "failed"} diff --git a/test/integration/logs/benchmark/data/simple.log b/test/integration/logs/benchmark/data/simple.log new file mode 100644 index 00000000000..2a4345b3bbe --- /dev/null +++ b/test/integration/logs/benchmark/data/simple.log @@ -0,0 +1 @@ +{"v": 0, "msg": "Pod status updated"} diff --git a/test/integration/logs/benchmark/data/values.log b/test/integration/logs/benchmark/data/values.log new file mode 100644 index 00000000000..f7c20c745f7 --- /dev/null +++ b/test/integration/logs/benchmark/data/values.log @@ -0,0 +1 @@ +{"v":0, "msg": "Example", "someValue": 1, "someString": "hello world", "pod": {"namespace": "system", "name": "kube-scheduler"}, "pv": {"name": "volume"}} diff --git a/test/integration/logs/benchmark/load.go b/test/integration/logs/benchmark/load.go new file mode 100644 index 00000000000..ccb5727b146 --- /dev/null +++ b/test/integration/logs/benchmark/load.go @@ -0,0 +1,269 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package benchmark + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "fmt" + "os" + "reflect" + "regexp" + "sort" + "strings" + "text/template" + + "k8s.io/api/core/v1" + "k8s.io/klog/v2" +) + +type logMessage struct { + msg string + verbosity int + err error + isError bool + kvs []interface{} +} + +const ( + stringArg = "string" + multiLineStringArg = "multiLineString" + objectStringArg = "objectString" + numberArg = "number" + krefArg = "kref" + otherArg = "other" + totalArg = "total" +) + +type logStats struct { + TotalLines, JsonLines, ErrorMessages int + + ArgCounts map[string]int + OtherLines []string + OtherArgs []interface{} + MultiLineArgs [][]string + ObjectTypes map[string]int +} + +var ( + logStatsTemplate = template.Must(template.New("format").Funcs(template.FuncMap{ + "percent": func(x, y int) string { + if y == 0 { + return "NA" + } + return fmt.Sprintf("%d%%", x*100/y) + }, + "sub": func(x, y int) int { + return x - y + }, + }).Parse(`Total number of lines: {{.TotalLines}} +Valid JSON messages: {{.JsonLines}} ({{percent .JsonLines .TotalLines}} of total lines) +Error messages: {{.ErrorMessages}} ({{percent .ErrorMessages .JsonLines}} of valid JSON messages) +Unrecognized lines: {{sub .TotalLines .JsonLines}} +{{range .OtherLines}} {{.}} +{{end}} +Args: + total: {{if .ArgCounts.total}}{{.ArgCounts.total}}{{else}}0{{end}}{{if .ArgCounts.string}} + strings: {{.ArgCounts.string}} ({{percent .ArgCounts.string .ArgCounts.total}}){{end}} {{if .ArgCounts.multiLineString}} + with line breaks: {{.ArgCounts.multiLineString}} ({{percent .ArgCounts.multiLineString .ArgCounts.total}} of all arguments) + {{range .MultiLineArgs}} ===== {{index . 0}} ===== +{{index . 1}} + +{{end}}{{end}}{{if .ArgCounts.objectString}} + with API objects: {{.ArgCounts.objectString}} ({{percent .ArgCounts.objectString .ArgCounts.total}} of all arguments) + types and their number of usage:{{range $key, $value := .ObjectTypes}} {{ $key }}:{{ $value }}{{end}}{{end}}{{if .ArgCounts.number}} + numbers: {{.ArgCounts.number}} ({{percent .ArgCounts.number .ArgCounts.total}}){{end}}{{if .ArgCounts.kref}} + ObjectRef: {{.ArgCounts.kref}} ({{percent .ArgCounts.kref .ArgCounts.total}}){{end}}{{if .ArgCounts.other}} + others: {{.ArgCounts.other}} ({{percent .ArgCounts.other .ArgCounts.total}}){{end}} +`)) +) + +// This produces too much output: +// {{range .OtherArgs}} {{.}} +// {{end}} + +// Doesn't work? +// Unrecognized lines: {{with $delta := sub .TotalLines .JsonLines}}{{$delta}} ({{percent $delta .TotalLines}} of total lines){{end}} + +func (s logStats) String() string { + var buffer bytes.Buffer + err := logStatsTemplate.Execute(&buffer, &s) + if err != nil { + return err.Error() + } + return buffer.String() +} + +func loadLog(path string) (messages []logMessage, stats logStats, err error) { + file, err := os.Open(path) + if err != nil { + return nil, logStats{}, err + } + defer file.Close() + + stats.ArgCounts = map[string]int{} + scanner := bufio.NewScanner(file) + for lineNo := 0; scanner.Scan(); lineNo++ { + line := scanner.Bytes() + msg, err := parseLine(line, &stats) + if err != nil { + stats.OtherLines = append(stats.OtherLines, fmt.Sprintf("%d: %s", lineNo, string(line))) + continue + } + messages = append(messages, msg) + } + + if err := scanner.Err(); err != nil { + return nil, logStats{}, fmt.Errorf("reading %s failed: %v", path, err) + } + + return +} + +// systemd prefix: +// Nov 19 02:08:51 kind-worker2 kubelet[250]: {"ts":1637287731687.8315,... +// +// kubectl (?) prefix: +// 2021-11-19T02:08:28.475825534Z stderr F {"ts": ... +var prefixRE = regexp.MustCompile(`^\w+ \d+ \S+ \S+ \S+: |\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z stderr . `) + +// String format for API structs from generated.pb.go. +// &Container{...} +var objectRE = regexp.MustCompile(`^&([a-zA-Z]*)\{`) + +func parseLine(line []byte, stats *logStats) (item logMessage, err error) { + stats.TotalLines++ + line = prefixRE.ReplaceAll(line, nil) + + content := map[string]interface{}{} + if err := json.Unmarshal(line, &content); err != nil { + return logMessage{}, fmt.Errorf("JSON parsing failed: %v", err) + } + stats.JsonLines++ + + kvs := map[string]interface{}{} + item.isError = true + for key, value := range content { + switch key { + case "v": + verbosity, ok := value.(float64) + if !ok { + return logMessage{}, fmt.Errorf("expected number for v, got: %T %v", value, value) + } + item.verbosity = int(verbosity) + item.isError = false + case "msg": + msg, ok := value.(string) + if !ok { + return logMessage{}, fmt.Errorf("expected string for msg, got: %T %v", value, value) + } + item.msg = msg + case "ts", "caller": + // ignore + case "err": + errStr, ok := value.(string) + if !ok { + return logMessage{}, fmt.Errorf("expected string for err, got: %T %v", value, value) + } + item.err = errors.New(errStr) + stats.ArgCounts[stringArg]++ + stats.ArgCounts[totalArg]++ + default: + if obj := toObject(value); obj != nil { + value = obj + } + switch value := value.(type) { + case string: + stats.ArgCounts[stringArg]++ + if strings.Contains(value, "\n") { + stats.ArgCounts[multiLineStringArg]++ + stats.MultiLineArgs = append(stats.MultiLineArgs, []string{key, value}) + } + match := objectRE.FindStringSubmatch(value) + if match != nil { + if stats.ObjectTypes == nil { + stats.ObjectTypes = map[string]int{} + } + stats.ArgCounts[objectStringArg]++ + stats.ObjectTypes[match[1]]++ + } + case float64: + stats.ArgCounts[numberArg]++ + case klog.ObjectRef: + stats.ArgCounts[krefArg]++ + default: + stats.ArgCounts[otherArg]++ + stats.OtherArgs = append(stats.OtherArgs, value) + } + stats.ArgCounts[totalArg]++ + kvs[key] = value + } + } + + // Sort by key. + var keys []string + for key := range kvs { + keys = append(keys, key) + } + sort.Strings(keys) + for _, key := range keys { + item.kvs = append(item.kvs, key, kvs[key]) + } + + if !item.isError && item.err != nil { + // Error is a normal key/value. + item.kvs = append(item.kvs, "err", item.err) + item.err = nil + } + if item.isError { + stats.ErrorMessages++ + } + return +} + +// This is a list of objects that might have been dumped. The simple ones must +// come first because unmarshaling will try one after the after and an +// ObjectRef would unmarshal fine into any of the others whereas any of the +// other types hopefully have enough extra fields that they won't fit (unknown +// fields are an error). +var objectTypes = []reflect.Type{ + reflect.TypeOf(klog.ObjectRef{}), + reflect.TypeOf(&v1.Pod{}), + reflect.TypeOf(&v1.Container{}), +} + +func toObject(value interface{}) interface{} { + data, ok := value.(map[string]interface{}) + if !ok { + return nil + } + jsonData, err := json.Marshal(data) + if err != nil { + return nil + } + for _, t := range objectTypes { + obj := reflect.New(t) + decoder := json.NewDecoder(bytes.NewBuffer(jsonData)) + decoder.DisallowUnknownFields() + if err := decoder.Decode(obj.Interface()); err == nil { + return reflect.Indirect(obj).Interface() + } + } + return nil +} diff --git a/test/integration/logs/benchmark/load_test.go b/test/integration/logs/benchmark/load_test.go new file mode 100644 index 00000000000..496377e0858 --- /dev/null +++ b/test/integration/logs/benchmark/load_test.go @@ -0,0 +1,233 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package benchmark + +import ( + "bytes" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" +) + +func TestData(t *testing.T) { + container := v1.Container{ + Command: []string{"sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"}, + Image: "k8s.gcr.io/e2e-test-images/busybox:1.29-2", + Name: "terminate-cmd-rpn", + TerminationMessagePath: "/dev/termination-log", + } + + testcases := map[string]struct { + messages []logMessage + printf, structured, json string + stats logStats + }{ + "data/simple.log": { + messages: []logMessage{ + { + msg: "Pod status updated", + }, + }, + printf: `Pod status updated: [] +`, + structured: `"Pod status updated" +`, + json: `{"msg":"Pod status updated","v":0} +`, + stats: logStats{ + TotalLines: 1, + JsonLines: 1, + ArgCounts: map[string]int{}, + }, + }, + "data/error.log": { + messages: []logMessage{ + { + msg: "Pod status update", + err: errors.New("failed"), + isError: true, + }, + }, + printf: `Pod status update: failed [] +`, + structured: `"Pod status update" err="failed" +`, + json: `{"msg":"Pod status update","err":"failed"} +`, + stats: logStats{ + TotalLines: 1, + JsonLines: 1, + ErrorMessages: 1, + ArgCounts: map[string]int{ + stringArg: 1, + totalArg: 1, + }, + }, + }, + "data/error-value.log": { + messages: []logMessage{ + { + msg: "Pod status update", + kvs: []interface{}{"err", errors.New("failed")}, + }, + }, + printf: `Pod status update: [err failed] +`, + structured: `"Pod status update" err="failed" +`, + json: `{"msg":"Pod status update","v":0,"err":"failed"} +`, + stats: logStats{ + TotalLines: 1, + JsonLines: 1, + ArgCounts: map[string]int{ + stringArg: 1, + totalArg: 1, + }, + }, + }, + "data/values.log": { + messages: []logMessage{ + { + msg: "Example", + kvs: []interface{}{ + "pod", klog.KRef("system", "kube-scheduler"), + "pv", klog.KRef("", "volume"), + "someString", "hello world", + "someValue", 1.0, + }, + }, + }, + printf: `Example: [pod system/kube-scheduler pv volume someString hello world someValue 1] +`, + structured: `"Example" pod="system/kube-scheduler" pv="volume" someString="hello world" someValue=1 +`, + json: `{"msg":"Example","v":0,"pod":{"name":"kube-scheduler","namespace":"system"},"pv":{"name":"volume"},"someString":"hello world","someValue":1} +`, + stats: logStats{ + TotalLines: 1, + JsonLines: 1, + ArgCounts: map[string]int{ + stringArg: 1, + krefArg: 2, + numberArg: 1, + totalArg: 4, + }, + }, + }, + "data/container.log": { + messages: []logMessage{ + { + msg: "Creating container in pod", + kvs: []interface{}{ + "container", &container, + }, + }, + }, + printf: `Creating container in pod: [container &Container{Name:terminate-cmd-rpn,Image:k8s.gcr.io/e2e-test-images/busybox:1.29-2,Command:[sh -c +f=/restart-count/restartCount +count=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'}) +if [ $count -eq 1 ]; then + exit 1 +fi +if [ $count -eq 2 ]; then + exit 0 +fi +while true; do sleep 1; done +],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,}] +`, + structured: `"Creating container in pod" container=< + &Container{Name:terminate-cmd-rpn,Image:k8s.gcr.io/e2e-test-images/busybox:1.29-2,Command:[sh -c + f=/restart-count/restartCount + count=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'}) + if [ $count -eq 1 ]; then + exit 1 + fi + if [ $count -eq 2 ]; then + exit 0 + fi + while true; do sleep 1; done + ],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,} + > +`, + // This is what the output would look like with JSON object. Because of https://github.com/kubernetes/kubernetes/issues/106652 we get the string instead. + // json: `{"msg":"Creating container in pod","v":0,"container":{"name":"terminate-cmd-rpn","image":"k8s.gcr.io/e2e-test-images/busybox:1.29-2","command":["sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"],"resources":{},"terminationMessagePath":"/dev/termination-log"}} + // `, + json: `{"msg":"Creating container in pod","v":0,"container":"&Container{Name:terminate-cmd-rpn,Image:k8s.gcr.io/e2e-test-images/busybox:1.29-2,Command:[sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,}"} +`, + stats: logStats{ + TotalLines: 2, + JsonLines: 1, + ArgCounts: map[string]int{ + totalArg: 1, + otherArg: 1, + }, + OtherLines: []string{ + "0: # This is a manually created message. See https://github.com/kubernetes/kubernetes/issues/106652 for the real one.", + }, + OtherArgs: []interface{}{ + &container, + }, + }, + }, + } + + for path, expected := range testcases { + t.Run(path, func(t *testing.T) { + messages, stats, err := loadLog(path) + if err != nil { + t.Fatalf("unexpected load error: %v", err) + } + assert.Equal(t, expected.messages, messages) + assert.Equal(t, expected.stats, stats) + print := func(format func(item logMessage)) { + for _, item := range expected.messages { + format(item) + } + } + testBuffered := func(t *testing.T, expected string, format func(item logMessage)) { + var buffer bytes.Buffer + klog.SetOutput(&buffer) + defer klog.SetOutput(&output) + + print(format) + klog.Flush() + assert.Equal(t, expected, buffer.String()) + } + + t.Run("printf", func(t *testing.T) { + testBuffered(t, expected.printf, printf) + }) + t.Run("structured", func(t *testing.T) { + testBuffered(t, expected.structured, prints) + }) + t.Run("json", func(t *testing.T) { + var buffer bytes.Buffer + logger := newJSONLogger(&buffer) + klog.SetLogger(logger) + defer klog.ClearLogger() + print(prints) + klog.Flush() + assert.Equal(t, expected.json, buffer.String()) + }) + }) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index ca070ad2045..47d6ca2d61c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -933,6 +933,7 @@ go.uber.org/atomic # go.uber.org/multierr v1.6.0 => go.uber.org/multierr v1.6.0 go.uber.org/multierr # go.uber.org/zap v1.19.0 => go.uber.org/zap v1.19.0 +## explicit go.uber.org/zap go.uber.org/zap/buffer go.uber.org/zap/internal/bufferpool