diff --git a/test/integration/logs/benchmark/benchmark_test.go b/test/integration/logs/benchmark/benchmark_test.go index ba15085410d..2312a485c4c 100644 --- a/test/integration/logs/benchmark/benchmark_test.go +++ b/test/integration/logs/benchmark/benchmark_test.go @@ -17,18 +17,29 @@ limitations under the License. package benchmark import ( + "errors" + "flag" "fmt" + "io" "io/fs" + "os" + "path" "path/filepath" "regexp" "strconv" "strings" + "sync" "testing" + "time" + "github.com/go-logr/logr" + "go.uber.org/zap/zapcore" + "k8s.io/component-base/logs" + logsjson "k8s.io/component-base/logs/json" "k8s.io/klog/v2" ) -func BenchmarkLogging(b *testing.B) { +func BenchmarkEncoding(b *testing.B) { // Each "data/(v[0-9]/)?*.log" file is expected to contain JSON log // messages. We generate one sub-benchmark for each file where logging // is tested with the log level from the directory. Symlinks can be @@ -106,3 +117,177 @@ func BenchmarkLogging(b *testing.B) { b.Fatalf("reading 'data' directory: %v", err) } } + +type loadGeneratorConfig struct { + // Length of the message written in each log entry. + messageLength int + + // Percentage of error log entries written. + errorPercentage float64 + + // Number of concurrent goroutines that generate log entries. + workers int +} + +// BenchmarkWriting simulates writing of a stream which mixes info and error log +// messages at a certain ratio. In contrast to BenchmarkEncoding, this stresses +// the output handling and includes the usual additional information (caller, +// time stamp). +// +// See https://github.com/kubernetes/kubernetes/issues/107029 for the +// motivation. +func BenchmarkWriting(b *testing.B) { + flag.Set("skip_headers", "false") + defer flag.Set("skip_headers", "true") + + // This could be made configurable and/or we could benchmark different + // configurations automatically. + config := loadGeneratorConfig{ + messageLength: 300, + errorPercentage: 1.0, + workers: 100, + } + + benchmarkWriting(b, config) +} + +func benchmarkWriting(b *testing.B, config loadGeneratorConfig) { + b.Run("discard", func(b *testing.B) { + benchmarkOutputFormats(b, config, true) + }) + b.Run("tmp-files", func(b *testing.B) { + benchmarkOutputFormats(b, config, false) + }) +} + +func benchmarkOutputFormats(b *testing.B, config loadGeneratorConfig, discard bool) { + tmpDir := b.TempDir() + b.Run("structured", func(b *testing.B) { + var out *os.File + if !discard { + var err error + out, err = os.Create(path.Join(tmpDir, "all.log")) + if err != nil { + b.Fatal(err) + } + klog.SetOutput(out) + defer klog.SetOutput(&output) + } + generateOutput(b, config, out) + }) + b.Run("JSON", func(b *testing.B) { + var logger logr.Logger + var out1, out2 *os.File + if !discard { + var err error + out1, err = os.Create(path.Join(tmpDir, "stream-1.log")) + if err != nil { + b.Fatal(err) + } + defer out1.Close() + out2, err = os.Create(path.Join(tmpDir, "stream-2.log")) + if err != nil { + b.Fatal(err) + } + defer out2.Close() + } + b.Run("single-stream", func(b *testing.B) { + if discard { + logger, _ = logsjson.NewJSONLogger(zapcore.AddSync(&output), nil, nil) + } else { + stderr := os.Stderr + os.Stderr = out1 + defer func() { + os.Stderr = stderr + }() + options := logs.NewOptions() + logger, _ = logsjson.Factory{}.Create(options.Config.Options) + } + klog.SetLogger(logger) + defer klog.ClearLogger() + generateOutput(b, config, out1) + }) + + b.Run("split-stream", func(b *testing.B) { + if discard { + logger, _ = logsjson.NewJSONLogger(zapcore.AddSync(&output), zapcore.AddSync(&output), nil) + } else { + stdout, stderr := os.Stdout, os.Stderr + os.Stdout, os.Stderr = out1, out2 + defer func() { + os.Stdout, os.Stderr = stdout, stderr + }() + options := logs.NewOptions() + options.Config.Options.JSON.SplitStream = true + logger, _ = logsjson.Factory{}.Create(options.Config.Options) + } + klog.SetLogger(logger) + defer klog.ClearLogger() + generateOutput(b, config, out1, out2) + }) + }) +} + +func generateOutput(b *testing.B, config loadGeneratorConfig, files ...*os.File) { + msg := strings.Repeat("X", config.messageLength) + err := errors.New("fail") + start := time.Now() + + // Scale by 1000 because "go test -bench" starts with b.N == 1, which is very low. + n := b.N * 1000 + + b.ResetTimer() + var wg sync.WaitGroup + for i := 0; i < config.workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + acc := 0.0 + for i := 0; i < n; i++ { + if acc > 100 { + klog.ErrorS(err, msg, "key", "value") + acc -= 100 + } else { + klog.InfoS(msg, "key", "value") + } + acc += config.errorPercentage + } + }() + } + wg.Wait() + klog.Flush() + b.StopTimer() + + // Print some information about the result. + end := time.Now() + duration := end.Sub(start) + total := n * config.workers + b.Logf("Wrote %d log entries in %s -> %.1f/s", total, duration, float64(total)/duration.Seconds()) + for i, file := range files { + if file != nil { + pos, err := file.Seek(0, os.SEEK_END) + if err != nil { + b.Fatal(err) + } + if _, err := file.Seek(0, os.SEEK_SET); err != nil { + b.Fatal(err) + } + max := 50 + buffer := make([]byte, max) + actual, err := file.Read(buffer) + if err != nil { + if err != io.EOF { + b.Fatal(err) + } + buffer = nil + } + if actual == max { + buffer[max-3] = '.' + buffer[max-2] = '.' + buffer[max-1] = '.' + } + b.Logf(" %d bytes to file #%d -> %.1fMiB/s (starts with: %s)", pos, i, float64(pos)/duration.Seconds()/1024/1024, string(buffer)) + } + } +} diff --git a/test/integration/logs/benchmark/common_test.go b/test/integration/logs/benchmark/common_test.go index b365c475823..9adcf50b014 100644 --- a/test/integration/logs/benchmark/common_test.go +++ b/test/integration/logs/benchmark/common_test.go @@ -30,6 +30,8 @@ import ( func init() { // Cause all klog output to be discarded with minimal overhead. // We don't include time stamps and caller information. + // Individual tests can change that by calling flag.Set again, + // but should always restore this state here. klog.InitFlags(nil) flag.Set("alsologtostderr", "false") flag.Set("logtostderr", "false")