diff --git a/test/integration/logs/benchmark/benchmark_test.go b/test/integration/logs/benchmark/benchmark_test.go index ba40436eea8..38249c648ef 100644 --- a/test/integration/logs/benchmark/benchmark_test.go +++ b/test/integration/logs/benchmark/benchmark_test.go @@ -18,7 +18,6 @@ package benchmark import ( "errors" - "flag" "fmt" "io" "io/fs" @@ -31,8 +30,9 @@ import ( "testing" "time" + "k8s.io/component-base/featuregate" logsapi "k8s.io/component-base/logs/api/v1" - logsjson "k8s.io/component-base/logs/json" + _ "k8s.io/component-base/logs/json/register" "k8s.io/klog/v2" ) @@ -69,44 +69,54 @@ func BenchmarkEncoding(b *testing.B) { if vMatch != nil { v, _ = strconv.Atoi(vMatch[1]) } + fileSizes := map[string]int{} - b.Run("printf", func(b *testing.B) { + test := func(b *testing.B, format string, print func(logger klog.Logger, item logMessage)) { + state := klog.CaptureState() + defer state.Restore() + + var output bytesWritten + c := logsapi.NewLoggingConfiguration() + c.Format = format + o := logsapi.LoggingOptions{ + ErrorStream: &output, + InfoStream: &output, + } + klog.SetOutput(&output) + if err := logsapi.ValidateAndApplyWithOptions(c, &o, nil); err != nil { + b.Fatalf("Unexpected error configuring logging: %v", err) + } + logger := klog.Background() b.ResetTimer() - output = 0 + start := time.Now() + total := int64(0) for i := 0; i < b.N; i++ { for _, item := range messages { if item.verbosity <= v { - printf(item) + print(logger, item) + total++ } } } - fileSizes["printf"] = int(output) / b.N + end := time.Now() + duration := end.Sub(start) + + // Report messages/s instead of ns/op because "op" varies. + b.ReportMetric(0, "ns/op") + b.ReportMetric(float64(total)/duration.Seconds(), "msgs/s") + fileSizes[filepath.Base(b.Name())] = int(output) + } + + b.Run("printf", func(b *testing.B) { + test(b, "text", func(_ klog.Logger, item logMessage) { + printf(item) + }) }) b.Run("structured", func(b *testing.B) { - b.ResetTimer() - output = 0 - for i := 0; i < b.N; i++ { - for _, item := range messages { - if item.verbosity <= v { - prints(item) - } - } - } - fileSizes["structured"] = int(output) / b.N + test(b, "text", prints) }) b.Run("JSON", func(b *testing.B) { - klog.SetLogger(jsonLogger) - defer klog.ClearLogger() - b.ResetTimer() - output = 0 - for i := 0; i < b.N; i++ { - for _, item := range messages { - if item.verbosity <= v { - prints(item) - } - } - } - fileSizes["JSON"] = int(output) / b.N + test(b, "json", prints) }) b.Log(fmt.Sprintf("%s: file sizes: %v\n", path, fileSizes)) @@ -136,9 +146,6 @@ type loadGeneratorConfig struct { // See https://github.com/kubernetes/kubernetes/issues/107029 for the // motivation. func BenchmarkWriting(b *testing.B) { - flag.Set("skip_headers", "false") - defer flag.Set("skip_headers", "true") - // This could be made configurable and/or we could benchmark different // configurations automatically. config := loadGeneratorConfig{ @@ -160,70 +167,92 @@ func benchmarkWriting(b *testing.B, config loadGeneratorConfig) { } func benchmarkOutputFormats(b *testing.B, config loadGeneratorConfig, discard bool) { - tmpDir := b.TempDir() b.Run("structured", func(b *testing.B) { - var out *os.File - if !discard { - var err error - out, err = os.Create(filepath.Join(tmpDir, "all.log")) - if err != nil { - b.Fatal(err) - } - klog.SetOutput(out) - defer klog.SetOutput(&output) - } - generateOutput(b, config, nil, out) + benchmarkOutputFormat(b, config, discard, "text") }) b.Run("JSON", func(b *testing.B) { - var out1, out2 *os.File - if !discard { - var err error - out1, err = os.Create(filepath.Join(tmpDir, "stream-1.log")) - if err != nil { - b.Fatal(err) - } - defer out1.Close() - out2, err = os.Create(filepath.Join(tmpDir, "stream-2.log")) - if err != nil { - b.Fatal(err) - } - defer out2.Close() - } - o := logsapi.LoggingOptions{} - if discard { - o.ErrorStream = io.Discard - o.InfoStream = io.Discard - } else { - o.ErrorStream = out1 - o.InfoStream = out1 - } - - b.Run("single-stream", func(b *testing.B) { - c := logsapi.NewLoggingConfiguration() - logger, control := logsjson.Factory{}.Create(*c, o) - klog.SetLogger(logger) - defer klog.ClearLogger() - generateOutput(b, config, control.Flush, out1) - }) - - b.Run("split-stream", func(b *testing.B) { - c := logsapi.NewLoggingConfiguration() - c.Options.JSON.SplitStream = true - logger, control := logsjson.Factory{}.Create(*c, o) - klog.SetLogger(logger) - defer klog.ClearLogger() - generateOutput(b, config, control.Flush, out1, out2) - }) + benchmarkOutputFormat(b, config, discard, "json") }) } -func generateOutput(b *testing.B, config loadGeneratorConfig, flush func(), files ...*os.File) { +func benchmarkOutputFormat(b *testing.B, config loadGeneratorConfig, discard bool, format string) { + b.Run("single-stream", func(b *testing.B) { + benchmarkOutputFormatStream(b, config, discard, format, false) + }) + b.Run("split-stream", func(b *testing.B) { + benchmarkOutputFormatStream(b, config, discard, format, true) + }) +} + +func benchmarkOutputFormatStream(b *testing.B, config loadGeneratorConfig, discard bool, format string, splitStreams bool) { + tmpDir := b.TempDir() + state := klog.CaptureState() + defer state.Restore() + + featureGate := featuregate.NewFeatureGate() + logsapi.AddFeatureGates(featureGate) + if err := featureGate.SetFromMap(map[string]bool{ + string(logsapi.LoggingAlphaOptions): true, + string(logsapi.LoggingBetaOptions): true, + }); err != nil { + b.Fatalf("Set feature gates: %v", err) + } + + // Create a logging configuration using the exact same code as a normal + // component. In order to redirect output, we provide a LoggingOptions + // instance. + var o logsapi.LoggingOptions + c := logsapi.NewLoggingConfiguration() + c.Format = format + if splitStreams { + c.Options.JSON.SplitStream = true + if err := c.Options.JSON.InfoBufferSize.Set("64Ki"); err != nil { + b.Fatalf("Error setting buffer size: %v", err) + } + } + var files []*os.File + if discard { + o.ErrorStream = io.Discard + o.InfoStream = io.Discard + } else { + out1, err := os.Create(filepath.Join(tmpDir, "stream-1.log")) + if err != nil { + b.Fatal(err) + } + defer out1.Close() + out2, err := os.Create(filepath.Join(tmpDir, "stream-2.log")) + if err != nil { + b.Fatal(err) + } + defer out2.Close() + + if splitStreams { + files = append(files, out1, out2) + o.ErrorStream = out1 + o.InfoStream = out2 + } else { + files = append(files, out1) + o.ErrorStream = out1 + o.InfoStream = out1 + } + } + + klog.SetOutput(o.ErrorStream) + if err := logsapi.ValidateAndApplyWithOptions(c, &o, featureGate); err != nil { + b.Fatalf("Unexpected error configuring logging: %v", err) + } + + generateOutput(b, config, files...) +} + +func generateOutput(b *testing.B, config loadGeneratorConfig, files ...*os.File) { msg := strings.Repeat("X", config.messageLength) err := errors.New("fail") start := time.Now() // Scale by 1000 because "go test -bench" starts with b.N == 1, which is very low. n := b.N * 1000 + total := config.workers * n b.ResetTimer() var wg sync.WaitGroup @@ -246,15 +275,15 @@ func generateOutput(b *testing.B, config loadGeneratorConfig, flush func(), file } wg.Wait() klog.Flush() - if flush != nil { - flush() - } b.StopTimer() - - // Print some information about the result. end := time.Now() duration := end.Sub(start) - total := n * config.workers + + // Report messages/s instead of ns/op because "op" varies. + b.ReportMetric(0, "ns/op") + b.ReportMetric(float64(total)/duration.Seconds(), "msgs/s") + + // Print some information about the result. b.Logf("Wrote %d log entries in %s -> %.1f/s", total, duration, float64(total)/duration.Seconds()) for i, file := range files { if file != nil { diff --git a/test/integration/logs/benchmark/common_test.go b/test/integration/logs/benchmark/common_test.go index 0b6fae600f3..76d851a44ce 100644 --- a/test/integration/logs/benchmark/common_test.go +++ b/test/integration/logs/benchmark/common_test.go @@ -18,28 +18,20 @@ package benchmark import ( "flag" - "io" - "github.com/go-logr/logr" - "go.uber.org/zap/zapcore" - - logsapi "k8s.io/component-base/logs/api/v1" - logsjson "k8s.io/component-base/logs/json" "k8s.io/klog/v2" ) func init() { - // Cause all klog output to be discarded with minimal overhead. - // We don't include time stamps and caller information. - // Individual tests can change that by calling flag.Set again, - // but should always restore this state here. + // hack/make-rules/test-integration.sh expects that all unit tests + // support -v and -vmodule. klog.InitFlags(nil) + + // Write all output into a single file. flag.Set("alsologtostderr", "false") flag.Set("logtostderr", "false") - flag.Set("skip_headers", "true") flag.Set("one_output", "true") flag.Set("stderrthreshold", "FATAL") - klog.SetOutput(&output) } type bytesWritten int64 @@ -50,22 +42,6 @@ func (b *bytesWritten) Write(data []byte) (int, error) { return l, nil } -func (b *bytesWritten) Sync() error { - return nil -} - -var output bytesWritten -var jsonLogger = newJSONLogger(&output) - -func newJSONLogger(out io.Writer) logr.Logger { - encoderConfig := &zapcore.EncoderConfig{ - MessageKey: "msg", - } - c := logsapi.NewLoggingConfiguration() - logger, _ := logsjson.NewJSONLogger(c.Verbosity, zapcore.AddSync(out), nil, encoderConfig) - return logger -} - func printf(item logMessage) { if item.isError { klog.Errorf("%s: %v %s", item.msg, item.err, item.kvs) @@ -74,17 +50,14 @@ func printf(item logMessage) { } } -// These variables are a workaround for logcheck complaining about the dynamic -// parameters. -var ( - errorS = klog.ErrorS - infoS = klog.InfoS -) - -func prints(item logMessage) { +func prints(logger klog.Logger, item logMessage) { if item.isError { - errorS(item.err, item.msg, item.kvs...) + logger.Error(item.err, item.msg, item.kvs...) // nolint: logcheck } else { - infoS(item.msg, item.kvs...) + logger.Info(item.msg, item.kvs...) // nolint: logcheck } } + +func printLogger(item logMessage) { + prints(klog.Background(), item) +} diff --git a/test/integration/logs/benchmark/load_test.go b/test/integration/logs/benchmark/load_test.go index 36a05987caa..c031756841d 100644 --- a/test/integration/logs/benchmark/load_test.go +++ b/test/integration/logs/benchmark/load_test.go @@ -25,6 +25,8 @@ import ( "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + logsapi "k8s.io/component-base/logs/api/v1" + _ "k8s.io/component-base/logs/json/register" "k8s.io/klog/v2" ) @@ -37,7 +39,9 @@ func TestData(t *testing.T) { } testcases := map[string]struct { - messages []logMessage + messages []logMessage + // These are subsets of the full output and may be empty. + // Prefix and variable stack traces therefore aren't compared. printf, structured, json string stats logStats }{ @@ -47,12 +51,9 @@ func TestData(t *testing.T) { msg: "Pod status updated", }, }, - printf: `Pod status updated: [] -`, - structured: `"Pod status updated" -`, - json: `{"msg":"Pod status updated","v":0} -`, + printf: `Pod status updated: []`, + structured: `"Pod status updated"`, + json: `"msg":"Pod status updated","v":0`, stats: logStats{ TotalLines: 1, JsonLines: 1, @@ -68,15 +69,6 @@ func TestData(t *testing.T) { msg: "Pod status updated again", }, }, - printf: `Pod status updated: [] -Pod status updated again: [] -`, - structured: `"Pod status updated" -"Pod status updated again" -`, - json: `{"msg":"Pod status updated","v":0} -{"msg":"Pod status updated again","v":0} -`, stats: logStats{ TotalLines: 3, SplitLines: 1, @@ -92,12 +84,9 @@ Pod status updated again: [] isError: true, }, }, - printf: `Pod status update: failed [] -`, - structured: `"Pod status update" err="failed" -`, - json: `{"msg":"Pod status update","err":"failed"} -`, + printf: `Pod status update: failed []`, + structured: `"Pod status update" err="failed"`, + json: `"msg":"Pod status update","err":"failed"`, stats: logStats{ TotalLines: 1, JsonLines: 1, @@ -115,12 +104,9 @@ Pod status updated again: [] kvs: []interface{}{"err", errors.New("failed")}, }, }, - printf: `Pod status update: [err failed] -`, - structured: `"Pod status update" err="failed" -`, - json: `{"msg":"Pod status update","v":0,"err":"failed"} -`, + printf: `Pod status update: [err failed]`, + structured: `"Pod status update" err="failed"`, + json: `"msg":"Pod status update","v":0,"err":"failed"`, stats: logStats{ TotalLines: 1, JsonLines: 1, @@ -142,12 +128,9 @@ Pod status updated again: [] }, }, }, - printf: `Example: [pod system/kube-scheduler pv volume someString hello world someValue 1] -`, - structured: `"Example" pod="system/kube-scheduler" pv="volume" someString="hello world" someValue=1 -`, - json: `{"msg":"Example","v":0,"pod":{"name":"kube-scheduler","namespace":"system"},"pv":{"name":"volume"},"someString":"hello world","someValue":1} -`, + printf: `Example: [pod system/kube-scheduler pv volume someString hello world someValue 1]`, + structured: `"Example" pod="system/kube-scheduler" pv="volume" someString="hello world" someValue=1`, + json: `"msg":"Example","v":0,"pod":{"name":"kube-scheduler","namespace":"system"},"pv":{"name":"volume"},"someString":"hello world","someValue":1`, stats: logStats{ TotalLines: 1, JsonLines: 1, @@ -168,18 +151,7 @@ Pod status updated again: [] }, }, }, - printf: `Creating container in pod: [container &Container{Name:terminate-cmd-rpn,Image:registry.k8s.io/e2e-test-images/busybox:1.29-2,Command:[sh -c -f=/restart-count/restartCount -count=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'}) -if [ $count -eq 1 ]; then - exit 1 -fi -if [ $count -eq 2 ]; then - exit 0 -fi -while true; do sleep 1; done -],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},Claims:[]ResourceClaim{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,ResizePolicy:[]ContainerResizePolicy{},}] -`, + printf: `Creating container in pod: [container &Container{Name:terminate-cmd-rpn,Image:registry.k8s.io/e2e-test-images/busybox:1.29-2,Command:[sh -c`, structured: `"Creating container in pod" container=< &Container{Name:terminate-cmd-rpn,Image:registry.k8s.io/e2e-test-images/busybox:1.29-2,Command:[sh -c f=/restart-count/restartCount @@ -192,13 +164,11 @@ while true; do sleep 1; done fi while true; do sleep 1; done ],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},Claims:[]ResourceClaim{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,ResizePolicy:[]ContainerResizePolicy{},} - > -`, + >`, // This is what the output would look like with JSON object. Because of https://github.com/kubernetes/kubernetes/issues/106652 we get the string instead. // json: `{"msg":"Creating container in pod","v":0,"container":{"name":"terminate-cmd-rpn","image":"registry.k8s.io/e2e-test-images/busybox:1.29-2","command":["sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"],"resources":{},"terminationMessagePath":"/dev/termination-log"}} // `, - json: `{"msg":"Creating container in pod","v":0,"container":"&Container{Name:terminate-cmd-rpn,Image:registry.k8s.io/e2e-test-images/busybox:1.29-2,Command:[sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},Claims:[]ResourceClaim{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,ResizePolicy:[]ContainerResizePolicy{},}"} -`, + json: `"msg":"Creating container in pod","v":0,"container":"&Container{Name:terminate-cmd-rpn,Image:registry.k8s.io/e2e-test-images/busybox:1.29-2,Command:[sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},Claims:[]ResourceClaim{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,ResizePolicy:[]ContainerResizePolicy{},}"`, stats: logStats{ TotalLines: 2, JsonLines: 1, @@ -216,25 +186,33 @@ while true; do sleep 1; done }, } - for path, expected := range testcases { - t.Run(path, func(t *testing.T) { - messages, stats, err := loadLog(path) + for filePath, expected := range testcases { + t.Run(filePath, func(t *testing.T) { + messages, stats, err := loadLog(filePath) if err != nil { t.Fatalf("unexpected load error: %v", err) } assert.Equal(t, expected.messages, messages) assert.Equal(t, expected.stats, stats) - print := func(format func(item logMessage)) { + printAll := func(format func(item logMessage)) { for _, item := range expected.messages { format(item) } } - testBuffered := func(t *testing.T, expected string, format func(item logMessage)) { + testBuffered := func(t *testing.T, expected string, format string, print func(item logMessage)) { var buffer bytes.Buffer + c := logsapi.NewLoggingConfiguration() + c.Format = format + o := logsapi.LoggingOptions{ + ErrorStream: &buffer, + InfoStream: &buffer, + } klog.SetOutput(&buffer) - defer klog.SetOutput(&output) + if err := logsapi.ValidateAndApplyWithOptions(c, &o, nil); err != nil { + t.Fatalf("Unexpected error configuring logging: %v", err) + } - print(format) + printAll(print) klog.Flush() if !strings.Contains(buffer.String(), expected) { @@ -243,19 +221,13 @@ while true; do sleep 1; done } t.Run("printf", func(t *testing.T) { - testBuffered(t, expected.printf, printf) + testBuffered(t, expected.printf, "text", printf) }) t.Run("structured", func(t *testing.T) { - testBuffered(t, expected.structured, prints) + testBuffered(t, expected.structured, "text", printLogger) }) t.Run("json", func(t *testing.T) { - var buffer bytes.Buffer - logger := newJSONLogger(&buffer) - klog.SetLogger(logger) - defer klog.ClearLogger() - print(prints) - klog.Flush() - assert.Equal(t, expected.json, buffer.String()) + testBuffered(t, expected.json, "json", printLogger) }) }) }