From 7f1a30f8d593e4c3421de53a8af53ab0399a76f6 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 11 Jan 2024 18:55:21 +0100 Subject: [PATCH 1/3] logs benchmark: fix config handling The logs config must be reset explicitly now when changing it multiple times per process. --- test/integration/logs/benchmark/benchmark_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/test/integration/logs/benchmark/benchmark_test.go b/test/integration/logs/benchmark/benchmark_test.go index 6e0cbe1a438..079c198a665 100644 --- a/test/integration/logs/benchmark/benchmark_test.go +++ b/test/integration/logs/benchmark/benchmark_test.go @@ -82,6 +82,11 @@ func BenchmarkEncoding(b *testing.B) { InfoStream: &output, } klog.SetOutput(&output) + defer func() { + if err := logsapi.ResetForTest(nil); err != nil { + b.Errorf("error resetting logsapi: %v", err) + } + }() if err := logsapi.ValidateAndApplyWithOptions(c, &o, nil); err != nil { b.Fatalf("Unexpected error configuring logging: %v", err) } @@ -237,6 +242,11 @@ func benchmarkOutputFormatStream(b *testing.B, config loadGeneratorConfig, disca } klog.SetOutput(o.ErrorStream) + defer func() { + if err := logsapi.ResetForTest(nil); err != nil { + b.Errorf("error resetting logsapi: %v", err) + } + }() if err := logsapi.ValidateAndApplyWithOptions(c, &o, featureGate); err != nil { b.Fatalf("Unexpected error configuring logging: %v", err) } From 04b772c66112acd82d58b0a5598bfa5049c4543d Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 11 Jan 2024 18:55:42 +0100 Subject: [PATCH 2/3] logs benchmark: really write through pipe While the benchmark is focused on encoding, it becomes a bit more realistic when actually passing the encoded data to the Linux kernel. Features like output buffering are more likely to have a visible effect when invoking syscalls. --- .../logs/benchmark/benchmark_test.go | 12 ++++++---- .../integration/logs/benchmark/common_test.go | 23 +++++++++++++++---- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/test/integration/logs/benchmark/benchmark_test.go b/test/integration/logs/benchmark/benchmark_test.go index 079c198a665..f7f38822dd4 100644 --- a/test/integration/logs/benchmark/benchmark_test.go +++ b/test/integration/logs/benchmark/benchmark_test.go @@ -74,14 +74,16 @@ func BenchmarkEncoding(b *testing.B) { state := klog.CaptureState() defer state.Restore() - var output bytesWritten + // To make the tests a bit more realistic, at + // least do system calls during each write. + output := newBytesWritten(b, "/dev/null") c := logsapi.NewLoggingConfiguration() c.Format = format o := logsapi.LoggingOptions{ - ErrorStream: &output, - InfoStream: &output, + ErrorStream: output, + InfoStream: output, } - klog.SetOutput(&output) + klog.SetOutput(output) defer func() { if err := logsapi.ResetForTest(nil); err != nil { b.Errorf("error resetting logsapi: %v", err) @@ -108,7 +110,7 @@ func BenchmarkEncoding(b *testing.B) { // Report messages/s instead of ns/op because "op" varies. b.ReportMetric(0, "ns/op") b.ReportMetric(float64(total)/duration.Seconds(), "msgs/s") - fileSizes[filepath.Base(b.Name())] = int(output) + fileSizes[filepath.Base(b.Name())] = int(output.bytesWritten) } b.Run("printf", func(b *testing.B) { diff --git a/test/integration/logs/benchmark/common_test.go b/test/integration/logs/benchmark/common_test.go index 76d851a44ce..a6c4ace4b0a 100644 --- a/test/integration/logs/benchmark/common_test.go +++ b/test/integration/logs/benchmark/common_test.go @@ -18,6 +18,8 @@ package benchmark import ( "flag" + "os" + "testing" "k8s.io/klog/v2" ) @@ -34,12 +36,25 @@ func init() { flag.Set("stderrthreshold", "FATAL") } -type bytesWritten int64 +func newBytesWritten(tb testing.TB, filename string) *bytesWritten { + out, err := os.Create(filename) + if err != nil { + tb.Fatalf("open fake output: %v", err) + } + tb.Cleanup(func() { _ = out.Close() }) + return &bytesWritten{ + out: out, + } +} + +type bytesWritten struct { + out *os.File + bytesWritten int64 +} func (b *bytesWritten) Write(data []byte) (int, error) { - l := len(data) - *b += bytesWritten(l) - return l, nil + b.bytesWritten += int64(len(data)) + return b.out.Write(data) } func printf(item logMessage) { From 8f4c9c7605ab47a73104e6eb24d9d19c2ef0b6e5 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 20 Dec 2022 19:39:45 +0100 Subject: [PATCH 3/3] k8s.io/component-base/logs: replace klog text implementation This replaces the klog formatting and message routing with a simpler implementation that uses less code. The main difference is that we skip the entire unused message routing. Instead, the same split output streams as for JSON gets implemented in the io.Writer implementation that gets passed to the textlogger. --- cmd/kube-proxy/app/server_test.go | 9 +- pkg/kubelet/apis/config/helpers_test.go | 24 ++- .../KubeletConfiguration/after/v1beta1.yaml | 2 + .../roundtrip/default/v1beta1.yaml | 2 + .../after/v1alpha1.yaml | 2 + .../roundtrip/default/v1alpha1.yaml | 2 + .../component-base/logs/api/v1/options.go | 36 ++++- .../logs/api/v1/options_test.go | 20 +++ .../component-base/logs/api/v1/registry.go | 2 +- .../k8s.io/component-base/logs/api/v1/text.go | 142 ++++++++++++++++++ .../component-base/logs/api/v1/types.go | 13 ++ .../component-base/logs/api/v1/types_test.go | 21 ++- .../logs/api/v1/validate_test.go | 8 +- .../logs/api/v1/zz_generated.deepcopy.go | 37 ++++- .../logs/json/register/register_test.go | 2 + .../logs/benchmark/benchmark_test.go | 4 + 16 files changed, 305 insertions(+), 21 deletions(-) create mode 100644 staging/src/k8s.io/component-base/logs/api/v1/text.go diff --git a/cmd/kube-proxy/app/server_test.go b/cmd/kube-proxy/app/server_test.go index 197dd772e6d..37559f10c1c 100644 --- a/cmd/kube-proxy/app/server_test.go +++ b/cmd/kube-proxy/app/server_test.go @@ -422,7 +422,14 @@ kind: KubeProxyConfiguration }, Options: logsapi.FormatOptions{ JSON: logsapi.JSONOptions{ - InfoBufferSize: resource.QuantityValue{Quantity: resource.MustParse("0")}, + OutputRoutingOptions: logsapi.OutputRoutingOptions{ + InfoBufferSize: resource.QuantityValue{Quantity: resource.MustParse("0")}, + }, + }, + Text: logsapi.TextOptions{ + OutputRoutingOptions: logsapi.OutputRoutingOptions{ + InfoBufferSize: resource.QuantityValue{Quantity: resource.MustParse("0")}, + }, }, }, } diff --git a/pkg/kubelet/apis/config/helpers_test.go b/pkg/kubelet/apis/config/helpers_test.go index 31f406b85f2..06fec06cf27 100644 --- a/pkg/kubelet/apis/config/helpers_test.go +++ b/pkg/kubelet/apis/config/helpers_test.go @@ -212,14 +212,22 @@ var ( "HealthzPort", "Logging.FlushFrequency", "Logging.Format", - "Logging.Options.JSON.InfoBufferSize.Quantity.Format", - "Logging.Options.JSON.InfoBufferSize.Quantity.d.Dec.scale", - "Logging.Options.JSON.InfoBufferSize.Quantity.d.Dec.unscaled.abs[*]", - "Logging.Options.JSON.InfoBufferSize.Quantity.d.Dec.unscaled.neg", - "Logging.Options.JSON.InfoBufferSize.Quantity.i.scale", - "Logging.Options.JSON.InfoBufferSize.Quantity.i.value", - "Logging.Options.JSON.InfoBufferSize.Quantity.s", - "Logging.Options.JSON.SplitStream", + "Logging.Options.JSON.OutputRoutingOptions.InfoBufferSize.Quantity.Format", + "Logging.Options.JSON.OutputRoutingOptions.InfoBufferSize.Quantity.d.Dec.scale", + "Logging.Options.JSON.OutputRoutingOptions.InfoBufferSize.Quantity.d.Dec.unscaled.abs[*]", + "Logging.Options.JSON.OutputRoutingOptions.InfoBufferSize.Quantity.d.Dec.unscaled.neg", + "Logging.Options.JSON.OutputRoutingOptions.InfoBufferSize.Quantity.i.scale", + "Logging.Options.JSON.OutputRoutingOptions.InfoBufferSize.Quantity.i.value", + "Logging.Options.JSON.OutputRoutingOptions.InfoBufferSize.Quantity.s", + "Logging.Options.JSON.OutputRoutingOptions.SplitStream", + "Logging.Options.Text.OutputRoutingOptions.InfoBufferSize.Quantity.Format", + "Logging.Options.Text.OutputRoutingOptions.InfoBufferSize.Quantity.d.Dec.scale", + "Logging.Options.Text.OutputRoutingOptions.InfoBufferSize.Quantity.d.Dec.unscaled.abs[*]", + "Logging.Options.Text.OutputRoutingOptions.InfoBufferSize.Quantity.d.Dec.unscaled.neg", + "Logging.Options.Text.OutputRoutingOptions.InfoBufferSize.Quantity.i.scale", + "Logging.Options.Text.OutputRoutingOptions.InfoBufferSize.Quantity.i.value", + "Logging.Options.Text.OutputRoutingOptions.InfoBufferSize.Quantity.s", + "Logging.Options.Text.OutputRoutingOptions.SplitStream", "Logging.VModule[*].FilePattern", "Logging.VModule[*].Verbosity", "Logging.Verbosity", diff --git a/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/after/v1beta1.yaml b/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/after/v1beta1.yaml index def3f0dc844..01ffc07bcca 100644 --- a/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/after/v1beta1.yaml +++ b/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/after/v1beta1.yaml @@ -57,6 +57,8 @@ logging: options: json: infoBufferSize: "0" + text: + infoBufferSize: "0" verbosity: 0 makeIPTablesUtilChains: true maxOpenFiles: 1000000 diff --git a/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/roundtrip/default/v1beta1.yaml b/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/roundtrip/default/v1beta1.yaml index ca5cf18b983..a2ef32269ae 100644 --- a/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/roundtrip/default/v1beta1.yaml +++ b/pkg/kubelet/apis/config/scheme/testdata/KubeletConfiguration/roundtrip/default/v1beta1.yaml @@ -57,6 +57,8 @@ logging: options: json: infoBufferSize: "0" + text: + infoBufferSize: "0" verbosity: 0 makeIPTablesUtilChains: true maxOpenFiles: 1000000 diff --git a/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/after/v1alpha1.yaml b/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/after/v1alpha1.yaml index fdbd9b06af3..faf62d0e922 100644 --- a/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/after/v1alpha1.yaml +++ b/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/after/v1alpha1.yaml @@ -46,6 +46,8 @@ logging: options: json: infoBufferSize: "0" + text: + infoBufferSize: "0" verbosity: 0 metricsBindAddress: 127.0.0.1:10249 mode: "" diff --git a/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/roundtrip/default/v1alpha1.yaml b/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/roundtrip/default/v1alpha1.yaml index fdbd9b06af3..faf62d0e922 100644 --- a/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/roundtrip/default/v1alpha1.yaml +++ b/pkg/proxy/apis/config/scheme/testdata/KubeProxyConfiguration/roundtrip/default/v1alpha1.yaml @@ -46,6 +46,8 @@ logging: options: json: infoBufferSize: "0" + text: + infoBufferSize: "0" verbosity: 0 metricsBindAddress: 127.0.0.1:10249 mode: "" diff --git a/staging/src/k8s.io/component-base/logs/api/v1/options.go b/staging/src/k8s.io/component-base/logs/api/v1/options.go index 2db9b1f5382..4abcc1de812 100644 --- a/staging/src/k8s.io/component-base/logs/api/v1/options.go +++ b/staging/src/k8s.io/component-base/logs/api/v1/options.go @@ -31,6 +31,7 @@ import ( "github.com/spf13/pflag" "k8s.io/klog/v2" + "k8s.io/klog/v2/textlogger" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/validation/field" @@ -188,10 +189,22 @@ func Validate(c *LoggingConfiguration, featureGate featuregate.FeatureGate, fldP func validateFormatOptions(c *LoggingConfiguration, featureGate featuregate.FeatureGate, fldPath *field.Path) field.ErrorList { errs := field.ErrorList{} + errs = append(errs, validateTextOptions(c, featureGate, fldPath.Child("text"))...) errs = append(errs, validateJSONOptions(c, featureGate, fldPath.Child("json"))...) return errs } +func validateTextOptions(c *LoggingConfiguration, featureGate featuregate.FeatureGate, fldPath *field.Path) field.ErrorList { + errs := field.ErrorList{} + if gate := LoggingAlphaOptions; c.Options.Text.SplitStream && !featureEnabled(featureGate, gate) { + errs = append(errs, field.Forbidden(fldPath.Child("splitStream"), fmt.Sprintf("Feature %s is disabled", gate))) + } + if gate := LoggingAlphaOptions; c.Options.Text.InfoBufferSize.Value() != 0 && !featureEnabled(featureGate, gate) { + errs = append(errs, field.Forbidden(fldPath.Child("infoBufferSize"), fmt.Sprintf("Feature %s is disabled", gate))) + } + return errs +} + func validateJSONOptions(c *LoggingConfiguration, featureGate featuregate.FeatureGate, fldPath *field.Path) field.ErrorList { errs := field.ErrorList{} if gate := LoggingAlphaOptions; c.Options.JSON.SplitStream && !featureEnabled(featureGate, gate) { @@ -254,7 +267,14 @@ func apply(c *LoggingConfiguration, options *LoggingOptions, featureGate feature defer setverbositylevel.Mutex.Unlock() setverbositylevel.Callbacks = append(setverbositylevel.Callbacks, control.SetVerbosityLevel) } - klog.SetLoggerWithOptions(log, klog.ContextualLogger(p.ContextualLoggingEnabled), klog.FlushLogger(control.Flush)) + opts := []klog.LoggerOption{ + klog.ContextualLogger(p.ContextualLoggingEnabled), + klog.FlushLogger(control.Flush), + } + if writer, ok := log.GetSink().(textlogger.KlogBufferWriter); ok { + opts = append(opts, klog.WriteKlogBuffer(writer.WriteKlogBuffer)) + } + klog.SetLoggerWithOptions(log, opts...) } if err := loggingFlags.Lookup("v").Value.Set(VerbosityLevelPflag(&c.Verbosity).String()); err != nil { return fmt.Errorf("internal error while setting klog verbosity: %v", err) @@ -346,6 +366,9 @@ func addFlags(c *LoggingConfiguration, fs flagSet) { fs.VarP(VerbosityLevelPflag(&c.Verbosity), "v", "v", "number for the log level verbosity") fs.Var(VModuleConfigurationPflag(&c.VModule), "vmodule", "comma-separated list of pattern=N settings for file-filtered logging (only works for text log format)") + fs.BoolVar(&c.Options.Text.SplitStream, "log-text-split-stream", false, "[Alpha] In text format, write error messages to stderr and info messages to stdout. The default is to write a single stream to stdout. Enable the LoggingAlphaOptions feature gate to use this.") + fs.Var(&c.Options.Text.InfoBufferSize, "log-text-info-buffer-size", "[Alpha] In text format with split output streams, the info messages can be buffered for a while to increase performance. The default value of zero bytes disables buffering. The size can be specified as number of bytes (512), multiples of 1000 (1K), multiples of 1024 (2Ki), or powers of those (3M, 4G, 5Mi, 6Gi). Enable the LoggingAlphaOptions feature gate to use this.") + // JSON options. We only register them if "json" is a valid format. The // config file API however always has them. if _, err := logRegistry.get("json"); err == nil { @@ -368,16 +391,21 @@ func SetRecommendedLoggingConfiguration(c *LoggingConfiguration) { c.FlushFrequency.Duration.Duration = LogFlushFreqDefault c.FlushFrequency.SerializeAsString = true } + setRecommendedOutputRouting(&c.Options.Text.OutputRoutingOptions) + setRecommendedOutputRouting(&c.Options.JSON.OutputRoutingOptions) +} + +func setRecommendedOutputRouting(o *OutputRoutingOptions) { var empty resource.QuantityValue - if c.Options.JSON.InfoBufferSize == empty { - c.Options.JSON.InfoBufferSize = resource.QuantityValue{ + if o.InfoBufferSize == empty { + o.InfoBufferSize = resource.QuantityValue{ // This is similar, but not quite the same as a default // constructed instance. Quantity: *resource.NewQuantity(0, resource.DecimalSI), } // This sets the unexported Quantity.s which will be compared // by reflect.DeepEqual in some tests. - _ = c.Options.JSON.InfoBufferSize.String() + _ = o.InfoBufferSize.String() } } diff --git a/staging/src/k8s.io/component-base/logs/api/v1/options_test.go b/staging/src/k8s.io/component-base/logs/api/v1/options_test.go index af2ef4eb662..bded29f193f 100644 --- a/staging/src/k8s.io/component-base/logs/api/v1/options_test.go +++ b/staging/src/k8s.io/component-base/logs/api/v1/options_test.go @@ -130,10 +130,14 @@ func TestFlagSet(t *testing.T) { // --log-flush-frequency duration Maximum number of seconds between log flushes (default 5s) // -v, --v Level number for the log level verbosity // --vmodule pattern=N,... comma-separated list of pattern=N settings for file-filtered logging (only works for text log format) + // --log-text-split-stream [Alpha] In text format, write error messages to stderr and info messages to stdout. The default is to write a single stream to stdout. Enable the LoggingAlphaOptions feature gate to use this. + // --log-text-info-buffer-size quantity [Alpha] In text format with split output streams, the info messages can be buffered for a while to increase performance. The default value of zero bytes disables buffering. The size can be specified as number of bytes (512), multiples of 1000 (1K), multiples of 1024 (2Ki), or powers of those (3M, 4G, 5Mi, 6Gi). Enable the LoggingAlphaOptions feature gate to use this. assert.Regexp(t, `^.*--logging-format.*default.*text.* .*--log-flush-frequency.*default 5s.* .*-v.*--v.* .*--vmodule.*pattern=N.* +.*--log-text-split-stream.* +.*--log-text-info-buffer-size quantity.* $`, buffer.String()) }) @@ -151,6 +155,10 @@ $`, buffer.String()) // Expected (Go 1.19): // -log-flush-frequency value // Maximum number of seconds between log flushes (default 5s) + // -log-text-info-buffer-size value + // [Alpha] In text format with split output streams, the info messages can be buffered for a while to increase performance. The default value of zero bytes disables buffering. The size can be specified as number of bytes (512), multiples of 1000 (1K), multiples of 1024 (2Ki), or powers of those (3M, 4G, 5Mi, 6Gi). Enable the LoggingAlphaOptions feature gate to use this. + // -log-text-split-stream + // [Alpha] In text format, write error messages to stderr and info messages to stdout. The default is to write a single stream to stdout. Enable the LoggingAlphaOptions feature gate to use this. // -logging-format value // Sets the log format. Permitted formats: "text". (default text) // -v value @@ -159,6 +167,10 @@ $`, buffer.String()) // comma-separated list of pattern=N settings for file-filtered logging (only works for text log format) assert.Regexp(t, `^.*-log-flush-frequency.* .*default 5s.* +.*-log-text-info-buffer-size.* +.* +.*-log-text-split-stream.* +.* .*-logging-format.* .*default.*text.* .*-v.* @@ -179,6 +191,10 @@ $`, buffer.String()) // known: // -log-flush-frequency duration // Maximum number of seconds between log flushes (default 5s) + // -log-text-info-buffer-size value + // [Alpha] In text format with split output streams, the info messages can be buffered for a while to increase performance. The default value of zero bytes disables buffering. The size can be specified as number of bytes (512), multiples of 1000 (1K), multiples of 1024 (2Ki), or powers of those (3M, 4G, 5Mi, 6Gi). Enable the LoggingAlphaOptions feature gate to use this. + // -log-text-split-stream + // [Alpha] In text format, write error messages to stderr and info messages to stdout. The default is to write a single stream to stdout. Enable the LoggingAlphaOptions feature gate to use this. // -logging-format string // Sets the log format. Permitted formats: "text". (default "text") // -v value @@ -187,6 +203,10 @@ $`, buffer.String()) // comma-separated list of pattern=N settings for file-filtered logging (only works for text log format) assert.Regexp(t, `^.*-log-flush-frequency.*duration.* .*default 5s.* +.*-log-text-info-buffer-size.* +.* +.*-log-text-split-stream.* +.* .*-logging-format.*string.* .*default.*text.* .*-v.* diff --git a/staging/src/k8s.io/component-base/logs/api/v1/registry.go b/staging/src/k8s.io/component-base/logs/api/v1/registry.go index 6dc23ec1826..f16c9ce6f1c 100644 --- a/staging/src/k8s.io/component-base/logs/api/v1/registry.go +++ b/staging/src/k8s.io/component-base/logs/api/v1/registry.go @@ -79,7 +79,7 @@ func newLogFormatRegistry() *logFormatRegistry { registry: make(map[string]logFormat), frozen: false, } - registry.register("text", logFormat{feature: LoggingStableOptions}) + _ = registry.register(DefaultLogFormat, logFormat{factory: textFactory{}, feature: LoggingStableOptions}) return registry } diff --git a/staging/src/k8s.io/component-base/logs/api/v1/text.go b/staging/src/k8s.io/component-base/logs/api/v1/text.go new file mode 100644 index 00000000000..2983d7d9207 --- /dev/null +++ b/staging/src/k8s.io/component-base/logs/api/v1/text.go @@ -0,0 +1,142 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "bufio" + "fmt" + "io" + "sync" + + "github.com/go-logr/logr" + + "k8s.io/component-base/featuregate" + "k8s.io/klog/v2/textlogger" +) + +// textFactory produces klog text logger instances. +type textFactory struct{} + +var _ LogFormatFactory = textFactory{} + +func (f textFactory) Feature() featuregate.Feature { + return LoggingStableOptions +} + +func (f textFactory) Create(c LoggingConfiguration, o LoggingOptions) (logr.Logger, RuntimeControl) { + output := o.ErrorStream + var flush func() + if c.Options.Text.SplitStream { + r := &klogMsgRouter{ + info: o.InfoStream, + error: o.ErrorStream, + } + size := c.Options.Text.InfoBufferSize.Value() + if size > 0 { + // Prevent integer overflow. + if size > 2*1024*1024*1024 { + size = 2 * 1024 * 1024 * 1024 + } + info := newBufferedWriter(r.info, int(size)) + flush = info.Flush + r.info = info + } + output = r + } + + options := []textlogger.ConfigOption{ + textlogger.Verbosity(int(c.Verbosity)), + textlogger.Output(output), + } + loggerConfig := textlogger.NewConfig(options...) + + // This should never fail, we produce a valid string here. + _ = loggerConfig.VModule().Set(VModuleConfigurationPflag(&c.VModule).String()) + + return textlogger.NewLogger(loggerConfig), + RuntimeControl{ + SetVerbosityLevel: func(v uint32) error { + return loggerConfig.Verbosity().Set(fmt.Sprintf("%d", v)) + }, + Flush: flush, + } +} + +type klogMsgRouter struct { + info, error io.Writer +} + +var _ io.Writer = &klogMsgRouter{} + +// Write redirects the message into either the info or error +// stream, depending on its type as indicated in text format +// by the first byte. +func (r *klogMsgRouter) Write(p []byte) (int, error) { + if len(p) == 0 { + return 0, nil + } + + if p[0] == 'I' { + return r.info.Write(p) + } + return r.error.Write(p) +} + +// bufferedWriter is an io.Writer that buffers writes in-memory before +// flushing them to a wrapped io.Writer after reaching some limit +// or getting flushed. +type bufferedWriter struct { + mu sync.Mutex + writer *bufio.Writer + out io.Writer +} + +func newBufferedWriter(out io.Writer, size int) *bufferedWriter { + return &bufferedWriter{ + writer: bufio.NewWriterSize(out, size), + out: out, + } +} + +func (b *bufferedWriter) Write(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + + // To avoid partial writes into the underlying writer, we ensure that + // the entire new data fits into the buffer or flush first. + if len(p) > b.writer.Available() && b.writer.Buffered() > 0 { + if err := b.writer.Flush(); err != nil { + return 0, err + } + } + + // If it still doesn't fit, then we bypass the now empty buffer + // and write directly. + if len(p) > b.writer.Available() { + return b.out.Write(p) + } + + // This goes into the buffer. + return b.writer.Write(p) +} + +func (b *bufferedWriter) Flush() { + b.mu.Lock() + defer b.mu.Unlock() + + _ = b.writer.Flush() +} diff --git a/staging/src/k8s.io/component-base/logs/api/v1/types.go b/staging/src/k8s.io/component-base/logs/api/v1/types.go index 33becd9d02f..603ccb47404 100644 --- a/staging/src/k8s.io/component-base/logs/api/v1/types.go +++ b/staging/src/k8s.io/component-base/logs/api/v1/types.go @@ -94,13 +94,26 @@ func (t *TimeOrMetaDuration) UnmarshalJSON(b []byte) error { // FormatOptions contains options for the different logging formats. type FormatOptions struct { + // [Alpha] Text contains options for logging format "text". + // Only available when the LoggingAlphaOptions feature gate is enabled. + Text TextOptions `json:"text,omitempty"` // [Alpha] JSON contains options for logging format "json". // Only available when the LoggingAlphaOptions feature gate is enabled. JSON JSONOptions `json:"json,omitempty"` } +// TextOptions contains options for logging format "text". +type TextOptions struct { + OutputRoutingOptions `json:",inline"` +} + // JSONOptions contains options for logging format "json". type JSONOptions struct { + OutputRoutingOptions `json:",inline"` +} + +// OutputRoutingOptions contains options that are supported by both "text" and "json". +type OutputRoutingOptions struct { // [Alpha] SplitStream redirects error messages to stderr while // info messages go to stdout, with buffering. The default is to write // both to stdout, without buffering. Only available when diff --git a/staging/src/k8s.io/component-base/logs/api/v1/types_test.go b/staging/src/k8s.io/component-base/logs/api/v1/types_test.go index 99e507f4203..5eb5c9fefea 100644 --- a/staging/src/k8s.io/component-base/logs/api/v1/types_test.go +++ b/staging/src/k8s.io/component-base/logs/api/v1/types_test.go @@ -160,6 +160,10 @@ func TestCompatibility(t *testing.T) { {"filePattern": "anotherFile", "verbosity": 1} ], "options": { + "text": { + "splitStream": true, + "infoBufferSize": "2048" + }, "json": { "splitStream": true, "infoBufferSize": "1024" @@ -184,10 +188,20 @@ func TestCompatibility(t *testing.T) { }, }, Options: FormatOptions{ + Text: TextOptions{ + OutputRoutingOptions: OutputRoutingOptions{ + SplitStream: true, + InfoBufferSize: resource.QuantityValue{ + Quantity: *resource.NewQuantity(2048, resource.DecimalSI), + }, + }, + }, JSON: JSONOptions{ - SplitStream: true, - InfoBufferSize: resource.QuantityValue{ - Quantity: *resource.NewQuantity(1024, resource.DecimalSI), + OutputRoutingOptions: OutputRoutingOptions{ + SplitStream: true, + InfoBufferSize: resource.QuantityValue{ + Quantity: *resource.NewQuantity(1024, resource.DecimalSI), + }, }, }, }, @@ -207,6 +221,7 @@ func TestCompatibility(t *testing.T) { } // This sets the internal "s" field just like unmarshaling does. // Required for assert.Equal to pass. + _ = tc.expectConfig.Options.Text.InfoBufferSize.String() _ = tc.expectConfig.Options.JSON.InfoBufferSize.String() assert.Equal(t, tc.expectConfig, config) if tc.expectAllFields { diff --git a/staging/src/k8s.io/component-base/logs/api/v1/validate_test.go b/staging/src/k8s.io/component-base/logs/api/v1/validate_test.go index 10a8d790d31..3d1955b0b2a 100644 --- a/staging/src/k8s.io/component-base/logs/api/v1/validate_test.go +++ b/staging/src/k8s.io/component-base/logs/api/v1/validate_test.go @@ -31,9 +31,11 @@ func TestValidation(t *testing.T) { Format: "text", Options: FormatOptions{ JSON: JSONOptions{ - SplitStream: true, - InfoBufferSize: resource.QuantityValue{ - Quantity: *resource.NewQuantity(1024, resource.DecimalSI), + OutputRoutingOptions: OutputRoutingOptions{ + SplitStream: true, + InfoBufferSize: resource.QuantityValue{ + Quantity: *resource.NewQuantity(1024, resource.DecimalSI), + }, }, }, }, diff --git a/staging/src/k8s.io/component-base/logs/api/v1/zz_generated.deepcopy.go b/staging/src/k8s.io/component-base/logs/api/v1/zz_generated.deepcopy.go index e90cbcb3490..0317c80202d 100644 --- a/staging/src/k8s.io/component-base/logs/api/v1/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/component-base/logs/api/v1/zz_generated.deepcopy.go @@ -24,6 +24,7 @@ package v1 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *FormatOptions) DeepCopyInto(out *FormatOptions) { *out = *in + in.Text.DeepCopyInto(&out.Text) in.JSON.DeepCopyInto(&out.JSON) return } @@ -41,7 +42,7 @@ func (in *FormatOptions) DeepCopy() *FormatOptions { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JSONOptions) DeepCopyInto(out *JSONOptions) { *out = *in - in.InfoBufferSize.DeepCopyInto(&out.InfoBufferSize) + in.OutputRoutingOptions.DeepCopyInto(&out.OutputRoutingOptions) return } @@ -78,6 +79,40 @@ func (in *LoggingConfiguration) DeepCopy() *LoggingConfiguration { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OutputRoutingOptions) DeepCopyInto(out *OutputRoutingOptions) { + *out = *in + in.InfoBufferSize.DeepCopyInto(&out.InfoBufferSize) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OutputRoutingOptions. +func (in *OutputRoutingOptions) DeepCopy() *OutputRoutingOptions { + if in == nil { + return nil + } + out := new(OutputRoutingOptions) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TextOptions) DeepCopyInto(out *TextOptions) { + *out = *in + in.OutputRoutingOptions.DeepCopyInto(&out.OutputRoutingOptions) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TextOptions. +func (in *TextOptions) DeepCopy() *TextOptions { + if in == nil { + return nil + } + out := new(TextOptions) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TimeOrMetaDuration) DeepCopyInto(out *TimeOrMetaDuration) { *out = *in diff --git a/staging/src/k8s.io/component-base/logs/json/register/register_test.go b/staging/src/k8s.io/component-base/logs/json/register/register_test.go index a82bc37f079..5797d5556cf 100644 --- a/staging/src/k8s.io/component-base/logs/json/register/register_test.go +++ b/staging/src/k8s.io/component-base/logs/json/register/register_test.go @@ -140,6 +140,8 @@ func TestJSONFormatRegister(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { + state := klog.CaptureState() + defer state.Restore() c := logsapi.NewLoggingConfiguration() fs := pflag.NewFlagSet("addflagstest", pflag.ContinueOnError) logsapi.AddFlags(c, fs) diff --git a/test/integration/logs/benchmark/benchmark_test.go b/test/integration/logs/benchmark/benchmark_test.go index f7f38822dd4..abc92e55531 100644 --- a/test/integration/logs/benchmark/benchmark_test.go +++ b/test/integration/logs/benchmark/benchmark_test.go @@ -215,6 +215,10 @@ func benchmarkOutputFormatStream(b *testing.B, config loadGeneratorConfig, disca if err := c.Options.JSON.InfoBufferSize.Set("64Ki"); err != nil { b.Fatalf("Error setting buffer size: %v", err) } + c.Options.Text.SplitStream = true + if err := c.Options.Text.InfoBufferSize.Set("64Ki"); err != nil { + b.Fatalf("Error setting buffer size: %v", err) + } } var files []*os.File if discard {