component-base: configurable JSON output

This implements the replacement of klog output to different files per level
with optionally splitting JSON output into two streams: one for info messages
on stdout, one for error messages on stderr. The info messages can get buffered
to increase performance. Because stdout and stderr might be merged by the
consumer, the info stream gets flushed before writing an error, to ensure that
the order of messages is preserved.

This also ensures that the following code pattern doesn't leak info messages:
   klog.ErrorS(err, ...)
   os.Exit(1)

Commands explicitly have to flush before exiting via logs.FlushLogs. Most
already do. But buffered info messages can still get lost during an unexpected
program termination, therefore buffering is off by default.

The new options get added to the v1alpha1 LoggingConfiguration with new command
line flags. Because it is an alpha field, changing it inside the v1beta kubelet
config should be okay as long as the fields are clearly marked as alpha.
This commit is contained in:
Patrick Ohly 2021-09-10 18:02:23 +02:00
parent 963d3c122d
commit b22263d835
22 changed files with 369 additions and 35 deletions

View File

@ -188,6 +188,14 @@ var (
"HealthzBindAddress", "HealthzBindAddress",
"HealthzPort", "HealthzPort",
"Logging.Format", "Logging.Format",
"Logging.Options.JSON.InfoBufferSize.Quantity.Format",
"Logging.Options.JSON.InfoBufferSize.Quantity.d.Dec.scale",
"Logging.Options.JSON.InfoBufferSize.Quantity.d.Dec.unscaled.abs[*]",
"Logging.Options.JSON.InfoBufferSize.Quantity.d.Dec.unscaled.neg",
"Logging.Options.JSON.InfoBufferSize.Quantity.i.scale",
"Logging.Options.JSON.InfoBufferSize.Quantity.i.value",
"Logging.Options.JSON.InfoBufferSize.Quantity.s",
"Logging.Options.JSON.SplitStream",
"Logging.Sanitization", "Logging.Sanitization",
"TLSCipherSuites[*]", "TLSCipherSuites[*]",
"TLSMinVersion", "TLSMinVersion",

View File

@ -54,6 +54,9 @@ kubeAPIBurst: 10
kubeAPIQPS: 5 kubeAPIQPS: 5
logging: logging:
format: text format: text
options:
json:
infoBufferSize: "0"
makeIPTablesUtilChains: true makeIPTablesUtilChains: true
maxOpenFiles: 1000000 maxOpenFiles: 1000000
maxPods: 110 maxPods: 110

View File

@ -54,6 +54,9 @@ kubeAPIBurst: 10
kubeAPIQPS: 5 kubeAPIQPS: 5
logging: logging:
format: text format: text
options:
json:
infoBufferSize: "0"
makeIPTablesUtilChains: true makeIPTablesUtilChains: true
maxOpenFiles: 1000000 maxOpenFiles: 1000000
maxPods: 110 maxPods: 110

View File

@ -280,7 +280,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
*out = make([]string, len(*in)) *out = make([]string, len(*in))
copy(*out, *in) copy(*out, *in)
} }
out.Logging = in.Logging in.Logging.DeepCopyInto(&out.Logging)
out.ShutdownGracePeriod = in.ShutdownGracePeriod out.ShutdownGracePeriod = in.ShutdownGracePeriod
out.ShutdownGracePeriodCriticalPods = in.ShutdownGracePeriodCriticalPods out.ShutdownGracePeriodCriticalPods = in.ShutdownGracePeriodCriticalPods
if in.ReservedMemory != nil { if in.ReservedMemory != nil {

View File

@ -17,6 +17,7 @@ limitations under the License.
package config package config
import ( import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
@ -88,4 +89,25 @@ type LoggingConfiguration struct {
// [Experimental] When enabled prevents logging of fields tagged as sensitive (passwords, keys, tokens). // [Experimental] When enabled prevents logging of fields tagged as sensitive (passwords, keys, tokens).
// Runtime log sanitization may introduce significant computation overhead and therefore should not be enabled in production.`) // Runtime log sanitization may introduce significant computation overhead and therefore should not be enabled in production.`)
Sanitization bool Sanitization bool
// [Experimental] Options holds additional parameters that are specific
// to the different logging formats. Only the options for the selected
// format get used, but all of them get validated.
Options FormatOptions
}
// FormatOptions contains options for the different logging formats.
type FormatOptions struct {
// [Experimental] JSON contains options for logging format "json".
JSON JSONOptions
}
// JSONOptions contains options for logging format "json".
type JSONOptions struct {
// [Experimental] SplitStream redirects error messages to stderr while
// info messages go to stdout, with buffering. The default is to write
// both to stdout, without buffering.
SplitStream bool
// [Experimental] InfoBufferSize sets the size of the info stream when
// using split streams. The default is zero, which disables buffering.
InfoBufferSize resource.QuantityValue
} }

View File

@ -19,6 +19,7 @@ package v1alpha1
import ( import (
"time" "time"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilpointer "k8s.io/utils/pointer" utilpointer "k8s.io/utils/pointer"
) )
@ -110,4 +111,15 @@ func RecommendedLoggingConfiguration(obj *LoggingConfiguration) {
if obj.Format == "" { if obj.Format == "" {
obj.Format = "text" obj.Format = "text"
} }
var empty resource.QuantityValue
if obj.Options.JSON.InfoBufferSize == empty {
obj.Options.JSON.InfoBufferSize = resource.QuantityValue{
// This is similar, but not quite the same as a default
// constructed instance.
Quantity: *resource.NewQuantity(0, resource.DecimalSI),
}
// This sets the unexported Quantity.s which will be compared
// by reflect.DeepEqual in some tests.
_ = obj.Options.JSON.InfoBufferSize.String()
}
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1 package v1alpha1
import ( import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
@ -90,4 +91,25 @@ type LoggingConfiguration struct {
// [Experimental] When enabled prevents logging of fields tagged as sensitive (passwords, keys, tokens). // [Experimental] When enabled prevents logging of fields tagged as sensitive (passwords, keys, tokens).
// Runtime log sanitization may introduce significant computation overhead and therefore should not be enabled in production.`) // Runtime log sanitization may introduce significant computation overhead and therefore should not be enabled in production.`)
Sanitization bool `json:"sanitization,omitempty"` Sanitization bool `json:"sanitization,omitempty"`
// [Experimental] Options holds additional parameters that are specific
// to the different logging formats. Only the options for the selected
// format get used, but all of them get validated.
Options FormatOptions `json:"options,omitempty"`
}
// FormatOptions contains options for the different logging formats.
type FormatOptions struct {
// [Experimental] JSON contains options for logging format "json".
JSON JSONOptions `json:"json,omitempty"`
}
// JSONOptions contains options for logging format "json".
type JSONOptions struct {
// [Experimental] SplitStream redirects error messages to stderr while
// info messages go to stdout, with buffering. The default is to write
// both to stdout, without buffering.
SplitStream bool `json:"splitStream,omitempty"`
// [Experimental] InfoBufferSize sets the size of the info stream when
// using split streams. The default is zero, which disables buffering.
InfoBufferSize resource.QuantityValue `json:"infoBufferSize,omitempty"`
} }

View File

@ -35,6 +35,26 @@ func init() {
// RegisterConversions adds conversion functions to the given scheme. // RegisterConversions adds conversion functions to the given scheme.
// Public to allow building arbitrary schemes. // Public to allow building arbitrary schemes.
func RegisterConversions(s *runtime.Scheme) error { func RegisterConversions(s *runtime.Scheme) error {
if err := s.AddGeneratedConversionFunc((*FormatOptions)(nil), (*config.FormatOptions)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_FormatOptions_To_config_FormatOptions(a.(*FormatOptions), b.(*config.FormatOptions), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*config.FormatOptions)(nil), (*FormatOptions)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_config_FormatOptions_To_v1alpha1_FormatOptions(a.(*config.FormatOptions), b.(*FormatOptions), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*JSONOptions)(nil), (*config.JSONOptions)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_JSONOptions_To_config_JSONOptions(a.(*JSONOptions), b.(*config.JSONOptions), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*config.JSONOptions)(nil), (*JSONOptions)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_config_JSONOptions_To_v1alpha1_JSONOptions(a.(*config.JSONOptions), b.(*JSONOptions), scope)
}); err != nil {
return err
}
if err := s.AddConversionFunc((*config.ClientConnectionConfiguration)(nil), (*ClientConnectionConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error { if err := s.AddConversionFunc((*config.ClientConnectionConfiguration)(nil), (*ClientConnectionConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_config_ClientConnectionConfiguration_To_v1alpha1_ClientConnectionConfiguration(a.(*config.ClientConnectionConfiguration), b.(*ClientConnectionConfiguration), scope) return Convert_config_ClientConnectionConfiguration_To_v1alpha1_ClientConnectionConfiguration(a.(*config.ClientConnectionConfiguration), b.(*ClientConnectionConfiguration), scope)
}); err != nil { }); err != nil {
@ -116,6 +136,52 @@ func autoConvert_config_DebuggingConfiguration_To_v1alpha1_DebuggingConfiguratio
return nil return nil
} }
func autoConvert_v1alpha1_FormatOptions_To_config_FormatOptions(in *FormatOptions, out *config.FormatOptions, s conversion.Scope) error {
if err := Convert_v1alpha1_JSONOptions_To_config_JSONOptions(&in.JSON, &out.JSON, s); err != nil {
return err
}
return nil
}
// Convert_v1alpha1_FormatOptions_To_config_FormatOptions is an autogenerated conversion function.
func Convert_v1alpha1_FormatOptions_To_config_FormatOptions(in *FormatOptions, out *config.FormatOptions, s conversion.Scope) error {
return autoConvert_v1alpha1_FormatOptions_To_config_FormatOptions(in, out, s)
}
func autoConvert_config_FormatOptions_To_v1alpha1_FormatOptions(in *config.FormatOptions, out *FormatOptions, s conversion.Scope) error {
if err := Convert_config_JSONOptions_To_v1alpha1_JSONOptions(&in.JSON, &out.JSON, s); err != nil {
return err
}
return nil
}
// Convert_config_FormatOptions_To_v1alpha1_FormatOptions is an autogenerated conversion function.
func Convert_config_FormatOptions_To_v1alpha1_FormatOptions(in *config.FormatOptions, out *FormatOptions, s conversion.Scope) error {
return autoConvert_config_FormatOptions_To_v1alpha1_FormatOptions(in, out, s)
}
func autoConvert_v1alpha1_JSONOptions_To_config_JSONOptions(in *JSONOptions, out *config.JSONOptions, s conversion.Scope) error {
out.SplitStream = in.SplitStream
out.InfoBufferSize = in.InfoBufferSize
return nil
}
// Convert_v1alpha1_JSONOptions_To_config_JSONOptions is an autogenerated conversion function.
func Convert_v1alpha1_JSONOptions_To_config_JSONOptions(in *JSONOptions, out *config.JSONOptions, s conversion.Scope) error {
return autoConvert_v1alpha1_JSONOptions_To_config_JSONOptions(in, out, s)
}
func autoConvert_config_JSONOptions_To_v1alpha1_JSONOptions(in *config.JSONOptions, out *JSONOptions, s conversion.Scope) error {
out.SplitStream = in.SplitStream
out.InfoBufferSize = in.InfoBufferSize
return nil
}
// Convert_config_JSONOptions_To_v1alpha1_JSONOptions is an autogenerated conversion function.
func Convert_config_JSONOptions_To_v1alpha1_JSONOptions(in *config.JSONOptions, out *JSONOptions, s conversion.Scope) error {
return autoConvert_config_JSONOptions_To_v1alpha1_JSONOptions(in, out, s)
}
func autoConvert_v1alpha1_LeaderElectionConfiguration_To_config_LeaderElectionConfiguration(in *LeaderElectionConfiguration, out *config.LeaderElectionConfiguration, s conversion.Scope) error { func autoConvert_v1alpha1_LeaderElectionConfiguration_To_config_LeaderElectionConfiguration(in *LeaderElectionConfiguration, out *config.LeaderElectionConfiguration, s conversion.Scope) error {
if err := v1.Convert_Pointer_bool_To_bool(&in.LeaderElect, &out.LeaderElect, s); err != nil { if err := v1.Convert_Pointer_bool_To_bool(&in.LeaderElect, &out.LeaderElect, s); err != nil {
return err return err
@ -145,11 +211,17 @@ func autoConvert_config_LeaderElectionConfiguration_To_v1alpha1_LeaderElectionCo
func autoConvert_v1alpha1_LoggingConfiguration_To_config_LoggingConfiguration(in *LoggingConfiguration, out *config.LoggingConfiguration, s conversion.Scope) error { func autoConvert_v1alpha1_LoggingConfiguration_To_config_LoggingConfiguration(in *LoggingConfiguration, out *config.LoggingConfiguration, s conversion.Scope) error {
out.Format = in.Format out.Format = in.Format
out.Sanitization = in.Sanitization out.Sanitization = in.Sanitization
if err := Convert_v1alpha1_FormatOptions_To_config_FormatOptions(&in.Options, &out.Options, s); err != nil {
return err
}
return nil return nil
} }
func autoConvert_config_LoggingConfiguration_To_v1alpha1_LoggingConfiguration(in *config.LoggingConfiguration, out *LoggingConfiguration, s conversion.Scope) error { func autoConvert_config_LoggingConfiguration_To_v1alpha1_LoggingConfiguration(in *config.LoggingConfiguration, out *LoggingConfiguration, s conversion.Scope) error {
out.Format = in.Format out.Format = in.Format
out.Sanitization = in.Sanitization out.Sanitization = in.Sanitization
if err := Convert_config_FormatOptions_To_v1alpha1_FormatOptions(&in.Options, &out.Options, s); err != nil {
return err
}
return nil return nil
} }

View File

@ -63,6 +63,40 @@ func (in *DebuggingConfiguration) DeepCopy() *DebuggingConfiguration {
return out return out
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *FormatOptions) DeepCopyInto(out *FormatOptions) {
*out = *in
in.JSON.DeepCopyInto(&out.JSON)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FormatOptions.
func (in *FormatOptions) DeepCopy() *FormatOptions {
if in == nil {
return nil
}
out := new(FormatOptions)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *JSONOptions) DeepCopyInto(out *JSONOptions) {
*out = *in
in.InfoBufferSize.DeepCopyInto(&out.InfoBufferSize)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JSONOptions.
func (in *JSONOptions) DeepCopy() *JSONOptions {
if in == nil {
return nil
}
out := new(JSONOptions)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LeaderElectionConfiguration) DeepCopyInto(out *LeaderElectionConfiguration) { func (in *LeaderElectionConfiguration) DeepCopyInto(out *LeaderElectionConfiguration) {
*out = *in *out = *in
@ -90,6 +124,7 @@ func (in *LeaderElectionConfiguration) DeepCopy() *LeaderElectionConfiguration {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LoggingConfiguration) DeepCopyInto(out *LoggingConfiguration) { func (in *LoggingConfiguration) DeepCopyInto(out *LoggingConfiguration) {
*out = *in *out = *in
in.Options.DeepCopyInto(&out.Options)
return return
} }

View File

@ -53,6 +53,40 @@ func (in *DebuggingConfiguration) DeepCopy() *DebuggingConfiguration {
return out return out
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *FormatOptions) DeepCopyInto(out *FormatOptions) {
*out = *in
in.JSON.DeepCopyInto(&out.JSON)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FormatOptions.
func (in *FormatOptions) DeepCopy() *FormatOptions {
if in == nil {
return nil
}
out := new(FormatOptions)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *JSONOptions) DeepCopyInto(out *JSONOptions) {
*out = *in
in.InfoBufferSize.DeepCopyInto(&out.InfoBufferSize)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JSONOptions.
func (in *JSONOptions) DeepCopy() *JSONOptions {
if in == nil {
return nil
}
out := new(JSONOptions)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LeaderElectionConfiguration) DeepCopyInto(out *LeaderElectionConfiguration) { func (in *LeaderElectionConfiguration) DeepCopyInto(out *LeaderElectionConfiguration) {
*out = *in *out = *in
@ -75,6 +109,7 @@ func (in *LeaderElectionConfiguration) DeepCopy() *LeaderElectionConfiguration {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LoggingConfiguration) DeepCopyInto(out *LoggingConfiguration) { func (in *LoggingConfiguration) DeepCopyInto(out *LoggingConfiguration) {
*out = *in *out = *in
in.Options.DeepCopyInto(&out.Options)
return return
} }

View File

@ -67,6 +67,13 @@ func BindLoggingFlags(c *config.LoggingConfiguration, fs *pflag.FlagSet) {
registry.LogRegistry.Freeze() registry.LogRegistry.Freeze()
fs.BoolVar(&c.Sanitization, "experimental-logging-sanitization", c.Sanitization, `[Experimental] When enabled prevents logging of fields tagged as sensitive (passwords, keys, tokens). fs.BoolVar(&c.Sanitization, "experimental-logging-sanitization", c.Sanitization, `[Experimental] When enabled prevents logging of fields tagged as sensitive (passwords, keys, tokens).
Runtime log sanitization may introduce significant computation overhead and therefore should not be enabled in production.`) Runtime log sanitization may introduce significant computation overhead and therefore should not be enabled in production.`)
// JSON options. We only register them if "json" is a valid format. The
// config file API however always has them.
if _, err := registry.LogRegistry.Get("json"); err == nil {
fs.BoolVar(&c.Options.JSON.SplitStream, "log-json-split-stream", false, "[Experimental] In JSON format, write error messages to stderr and info messages to stdout. The default is to write a single stream to stdout.")
fs.Var(&c.Options.JSON.InfoBufferSize, "log-json-info-buffer-size", "[Experimental] In JSON format with split output streams, the info messages can be buffered for a while to increase performance. The default value of zero bytes disables buffering. The size can be specified as number of bytes (512), multiples of 1000 (1K), multiples of 1024 (2Ki), or powers of those (3M, 4G, 5Mi, 6Gi).")
}
} }
// UnsupportedLoggingFlags lists unsupported logging flags. The normalize // UnsupportedLoggingFlags lists unsupported logging flags. The normalize

View File

@ -25,6 +25,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
"k8s.io/component-base/config"
"k8s.io/component-base/logs/registry" "k8s.io/component-base/logs/registry"
) )
@ -33,15 +34,41 @@ var (
timeNow = time.Now timeNow = time.Now
) )
// NewJSONLogger creates a new json logr.Logger using the given Zap Logger to log. // NewJSONLogger creates a new json logr.Logger and its associated
func NewJSONLogger(w zapcore.WriteSyncer) logr.Logger { // flush function. The separate error stream is optional and may be nil.
func NewJSONLogger(infoStream, errorStream zapcore.WriteSyncer) (logr.Logger, func()) {
encoder := zapcore.NewJSONEncoder(encoderConfig) encoder := zapcore.NewJSONEncoder(encoderConfig)
// The log level intentionally gets set as low as possible to var core zapcore.Core
// ensure that all messages are printed when this logger gets if errorStream == nil {
// called by klog. The actual verbosity check happens in klog. core = zapcore.NewCore(encoder, zapcore.AddSync(infoStream), zapcore.Level(-127))
core := zapcore.NewCore(encoder, zapcore.AddSync(w), zapcore.Level(-127)) } else {
// Set up writing of error messages to stderr and info messages
// to stdout. Info messages get optionally buffered and flushed
// - through klog.FlushLogs -> zapr Flush -> zap Sync
// - when an error gets logged
//
// The later is important when both streams get merged into a single
// stream by the consumer (same console for a command line tool, pod
// log for a container) because without it, messages get reordered.
flushError := writeWithFlushing{
WriteSyncer: errorStream,
other: infoStream,
}
highPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl >= zapcore.ErrorLevel
})
lowPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl < zapcore.ErrorLevel
})
core = zapcore.NewTee(
zapcore.NewCore(encoder, flushError, highPriority),
zapcore.NewCore(encoder, infoStream, lowPriority),
)
}
l := zap.New(core, zap.WithCaller(true)) l := zap.New(core, zap.WithCaller(true))
return zapr.NewLoggerWithOptions(l, zapr.LogInfoLevel("v"), zapr.ErrorKey("err")) return zapr.NewLoggerWithOptions(l, zapr.LogInfoLevel("v"), zapr.ErrorKey("err")), func() {
l.Sync()
}
} }
var encoderConfig = zapcore.EncoderConfig{ var encoderConfig = zapcore.EncoderConfig{
@ -62,8 +89,36 @@ func epochMillisTimeEncoder(_ time.Time, enc zapcore.PrimitiveArrayEncoder) {
// Factory produces JSON logger instances. // Factory produces JSON logger instances.
type Factory struct{} type Factory struct{}
func (f Factory) Create() logr.Logger { var _ registry.LogFormatFactory = Factory{}
return NewJSONLogger(zapcore.Lock(os.Stdout))
func (f Factory) Create(options config.FormatOptions) (logr.Logger, func()) {
if options.JSON.SplitStream {
infoStream := zapcore.Lock(os.Stdout)
size := options.JSON.InfoBufferSize.Value()
if size > 0 {
// Prevent integer overflow.
if size > 2*1024*1024*1024 {
size = 2 * 1024 * 1024 * 1024
}
infoStream = &zapcore.BufferedWriteSyncer{
WS: infoStream,
Size: int(size),
}
}
return NewJSONLogger(infoStream, zapcore.Lock(os.Stderr))
}
out := zapcore.Lock(os.Stdout)
return NewJSONLogger(out, out)
} }
var _ registry.LogFormatFactory = Factory{} // writeWithFlushing is a wrapper around an output stream which flushes another
// output stream before each write.
type writeWithFlushing struct {
zapcore.WriteSyncer
other zapcore.WriteSyncer
}
func (f writeWithFlushing) Write(bs []byte) (int, error) {
f.other.Sync()
return f.WriteSyncer.Write(bs)
}

View File

@ -23,8 +23,10 @@ import (
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
) )
var writer = zapcore.AddSync(&writeSyncer{})
func BenchmarkInfoLoggerInfo(b *testing.B) { func BenchmarkInfoLoggerInfo(b *testing.B) {
logger := NewJSONLogger(zapcore.AddSync(&writeSyncer{})) logger, _ := NewJSONLogger(writer, writer)
b.ResetTimer() b.ResetTimer()
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
@ -53,7 +55,7 @@ func BenchmarkInfoLoggerInfo(b *testing.B) {
} }
func BenchmarkZapLoggerError(b *testing.B) { func BenchmarkZapLoggerError(b *testing.B) {
logger := NewJSONLogger(zapcore.AddSync(&writeSyncer{})) logger, _ := NewJSONLogger(writer, writer)
b.ResetTimer() b.ResetTimer()
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
@ -83,7 +85,7 @@ func BenchmarkZapLoggerError(b *testing.B) {
} }
func BenchmarkZapLoggerV(b *testing.B) { func BenchmarkZapLoggerV(b *testing.B) {
logger := NewJSONLogger(zapcore.AddSync(&writeSyncer{})) logger, _ := NewJSONLogger(writer, writer)
b.ResetTimer() b.ResetTimer()
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {

View File

@ -17,7 +17,6 @@ limitations under the License.
package logs package logs
import ( import (
"bufio"
"bytes" "bytes"
"fmt" "fmt"
"strings" "strings"
@ -64,10 +63,9 @@ func TestZapLoggerInfo(t *testing.T) {
for _, data := range testDataInfo { for _, data := range testDataInfo {
var buffer bytes.Buffer var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer) writer := zapcore.AddSync(&buffer)
var sampleInfoLogger = NewJSONLogger(zapcore.AddSync(writer)) sampleInfoLogger, _ := NewJSONLogger(writer, nil)
sampleInfoLogger.Info(data.msg, data.keysValues...) sampleInfoLogger.Info(data.msg, data.keysValues...)
writer.Flush()
logStr := buffer.String() logStr := buffer.String()
logStrLines := strings.Split(logStr, "\n") logStrLines := strings.Split(logStr, "\n")
@ -96,7 +94,7 @@ func TestZapLoggerInfo(t *testing.T) {
// TestZapLoggerEnabled test ZapLogger enabled // TestZapLoggerEnabled test ZapLogger enabled
func TestZapLoggerEnabled(t *testing.T) { func TestZapLoggerEnabled(t *testing.T) {
var sampleInfoLogger = NewJSONLogger(nil) sampleInfoLogger, _ := NewJSONLogger(nil, nil)
for i := 0; i < 11; i++ { for i := 0; i < 11; i++ {
if !sampleInfoLogger.V(i).Enabled() { if !sampleInfoLogger.V(i).Enabled() {
t.Errorf("V(%d).Info should be enabled", i) t.Errorf("V(%d).Info should be enabled", i)
@ -112,10 +110,9 @@ func TestZapLoggerV(t *testing.T) {
for i := 0; i < 11; i++ { for i := 0; i < 11; i++ {
var buffer bytes.Buffer var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer) writer := zapcore.AddSync(&buffer)
var sampleInfoLogger = NewJSONLogger(zapcore.AddSync(writer)) sampleInfoLogger, _ := NewJSONLogger(writer, nil)
sampleInfoLogger.V(i).Info("test", "ns", "default", "podnum", 2, "time", time.Microsecond) sampleInfoLogger.V(i).Info("test", "ns", "default", "podnum", 2, "time", time.Microsecond)
writer.Flush()
logStr := buffer.String() logStr := buffer.String()
var v, lineNo int var v, lineNo int
expectFormat := "{\"ts\":0.000123,\"caller\":\"json/json_test.go:%d\",\"msg\":\"test\",\"v\":%d,\"ns\":\"default\",\"podnum\":2,\"time\":\"1µs\"}\n" expectFormat := "{\"ts\":0.000123,\"caller\":\"json/json_test.go:%d\",\"msg\":\"test\",\"v\":%d,\"ns\":\"default\",\"podnum\":2,\"time\":\"1µs\"}\n"
@ -137,13 +134,12 @@ func TestZapLoggerV(t *testing.T) {
// TestZapLoggerError test ZapLogger json error format // TestZapLoggerError test ZapLogger json error format
func TestZapLoggerError(t *testing.T) { func TestZapLoggerError(t *testing.T) {
var buffer bytes.Buffer var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer) writer := zapcore.AddSync(&buffer)
timeNow = func() time.Time { timeNow = func() time.Time {
return time.Date(1970, time.January, 1, 0, 0, 0, 123, time.UTC) return time.Date(1970, time.January, 1, 0, 0, 0, 123, time.UTC)
} }
var sampleInfoLogger = NewJSONLogger(zapcore.AddSync(writer)) sampleInfoLogger, _ := NewJSONLogger(writer, nil)
sampleInfoLogger.Error(fmt.Errorf("invalid namespace:%s", "default"), "wrong namespace", "ns", "default", "podnum", 2, "time", time.Microsecond) sampleInfoLogger.Error(fmt.Errorf("invalid namespace:%s", "default"), "wrong namespace", "ns", "default", "podnum", 2, "time", time.Microsecond)
writer.Flush()
logStr := buffer.String() logStr := buffer.String()
var ts float64 var ts float64
var lineNo int var lineNo int
@ -158,6 +154,38 @@ func TestZapLoggerError(t *testing.T) {
} }
} }
func TestZapLoggerStreams(t *testing.T) {
var infoBuffer, errorBuffer bytes.Buffer
log, _ := NewJSONLogger(zapcore.AddSync(&infoBuffer), zapcore.AddSync(&errorBuffer))
log.Error(fmt.Errorf("some error"), "failed")
log.Info("hello world")
logStr := errorBuffer.String()
var ts float64
var lineNo int
expectFormat := `{"ts":%f,"caller":"json/json_test.go:%d","msg":"failed","err":"some error"}`
n, err := fmt.Sscanf(logStr, expectFormat, &ts, &lineNo)
if n != 2 || err != nil {
t.Errorf("error log format error: %d elements, error %s:\n%s", n, err, logStr)
}
expect := fmt.Sprintf(expectFormat, ts, lineNo)
if !assert.JSONEq(t, expect, logStr) {
t.Errorf("error log has wrong format \n expect:%s\n got:%s", expect, logStr)
}
logStr = infoBuffer.String()
expectFormat = `{"ts":%f,"caller":"json/json_test.go:%d","msg":"hello world","v":0}`
n, err = fmt.Sscanf(logStr, expectFormat, &ts, &lineNo)
if n != 2 || err != nil {
t.Errorf("info log format error: %d elements, error %s:\n%s", n, err, logStr)
}
expect = fmt.Sprintf(expectFormat, ts, lineNo)
if !assert.JSONEq(t, expect, logStr) {
t.Errorf("info has wrong format \n expect:%s\n got:%s", expect, logStr)
}
}
type testBuff struct { type testBuff struct {
writeCount int writeCount int
} }

View File

@ -206,7 +206,8 @@ func TestKlogIntegration(t *testing.T) {
for _, tc := range tcs { for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var buffer bytes.Buffer var buffer bytes.Buffer
var logger = NewJSONLogger(zapcore.AddSync(&buffer)) writer := zapcore.AddSync(&buffer)
logger, _ := NewJSONLogger(writer, writer)
klog.SetLogger(logger) klog.SetLogger(logger)
defer klog.ClearLogger() defer klog.ClearLogger()
@ -236,7 +237,8 @@ func TestKlogIntegration(t *testing.T) {
// TestKlogV test klog -v(--verbose) func available with json logger // TestKlogV test klog -v(--verbose) func available with json logger
func TestKlogV(t *testing.T) { func TestKlogV(t *testing.T) {
var buffer testBuff var buffer testBuff
logger := NewJSONLogger(&buffer) writer := zapcore.AddSync(&buffer)
logger, _ := NewJSONLogger(writer, writer)
klog.SetLogger(logger) klog.SetLogger(logger)
defer klog.ClearLogger() defer klog.ClearLogger()
fs := flag.FlagSet{} fs := flag.FlagSet{}

View File

@ -23,6 +23,7 @@ import (
"github.com/spf13/pflag" "github.com/spf13/pflag"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/component-base/config" "k8s.io/component-base/config"
"k8s.io/component-base/logs" "k8s.io/component-base/logs"
@ -42,6 +43,14 @@ func TestJSONFlag(t *testing.T) {
} }
func TestJSONFormatRegister(t *testing.T) { func TestJSONFormatRegister(t *testing.T) {
defaultOptions := config.FormatOptions{
JSON: config.JSONOptions{
InfoBufferSize: resource.QuantityValue{
Quantity: *resource.NewQuantity(0, resource.DecimalSI),
},
},
}
_ = defaultOptions.JSON.InfoBufferSize.String()
testcases := []struct { testcases := []struct {
name string name string
args []string args []string
@ -53,7 +62,8 @@ func TestJSONFormatRegister(t *testing.T) {
args: []string{"--logging-format=json"}, args: []string{"--logging-format=json"},
want: &logs.Options{ want: &logs.Options{
Config: config.LoggingConfiguration{ Config: config.LoggingConfiguration{
Format: logs.JSONLogFormat, Format: logs.JSONLogFormat,
Options: defaultOptions,
}, },
}, },
}, },
@ -62,7 +72,8 @@ func TestJSONFormatRegister(t *testing.T) {
args: []string{"--logging-format=test"}, args: []string{"--logging-format=test"},
want: &logs.Options{ want: &logs.Options{
Config: config.LoggingConfiguration{ Config: config.LoggingConfiguration{
Format: "test", Format: "test",
Options: defaultOptions,
}, },
}, },
errs: field.ErrorList{&field.Error{ errs: field.ErrorList{&field.Error{

View File

@ -42,6 +42,7 @@ const deprecated = "will be removed in a future release, see https://github.com/
var ( var (
packageFlags = flag.NewFlagSet("logging", flag.ContinueOnError) packageFlags = flag.NewFlagSet("logging", flag.ContinueOnError)
logFlushFreq time.Duration logFlushFreq time.Duration
logrFlush func()
) )
func init() { func init() {
@ -128,6 +129,9 @@ func InitLogs() {
// are printed before exiting the program. // are printed before exiting the program.
func FlushLogs() { func FlushLogs() {
klog.Flush() klog.Flush()
if logrFlush != nil {
logrFlush()
}
} }
// NewLogger creates a new log.Logger which sends logs to klog.Info. // NewLogger creates a new log.Logger which sends logs to klog.Info.

View File

@ -63,7 +63,9 @@ func (o *Options) Apply() {
if factory == nil { if factory == nil {
klog.ClearLogger() klog.ClearLogger()
} else { } else {
klog.SetLogger(factory.Create()) log, flush := factory.Create(o.Config.Options)
klog.SetLogger(log)
logrFlush = flush
} }
if o.Config.Sanitization { if o.Config.Sanitization {
klog.SetLogFilter(&sanitization.SanitizingFilter{}) klog.SetLogFilter(&sanitization.SanitizingFilter{})

View File

@ -68,6 +68,7 @@ func TestOptions(t *testing.T) {
Config: config.LoggingConfiguration{ Config: config.LoggingConfiguration{
Format: DefaultLogFormat, Format: DefaultLogFormat,
Sanitization: true, Sanitization: true,
Options: NewOptions().Config.Options,
}, },
}, },
}, },
@ -76,7 +77,8 @@ func TestOptions(t *testing.T) {
args: []string{"--logging-format=test"}, args: []string{"--logging-format=test"},
want: &Options{ want: &Options{
Config: config.LoggingConfiguration{ Config: config.LoggingConfiguration{
Format: "test", Format: "test",
Options: NewOptions().Config.Options,
}, },
}, },
errs: field.ErrorList{&field.Error{ errs: field.ErrorList{&field.Error{

View File

@ -21,6 +21,8 @@ import (
"sort" "sort"
"github.com/go-logr/logr" "github.com/go-logr/logr"
"k8s.io/component-base/config"
) )
// LogRegistry is new init LogFormatRegistry struct // LogRegistry is new init LogFormatRegistry struct
@ -35,8 +37,11 @@ type LogFormatRegistry struct {
// LogFormatFactory provides support for a certain additional, // LogFormatFactory provides support for a certain additional,
// non-default log format. // non-default log format.
type LogFormatFactory interface { type LogFormatFactory interface {
// Create returns a logger. // Create returns a logger with the requested configuration.
Create() logr.Logger // Returning a flush function for the logger is optional.
// If provided, the caller must ensure that it is called
// periodically (if desired) and at program exit.
Create(options config.FormatOptions) (log logr.Logger, flush func())
} }
// NewLogFormatRegistry return new init LogFormatRegistry struct // NewLogFormatRegistry return new init LogFormatRegistry struct

View File

@ -37,8 +37,12 @@ func ValidateLoggingConfiguration(c *config.LoggingConfiguration, fldPath *field
} }
} }
} }
if _, err := registry.LogRegistry.Get(c.Format); err != nil { _, err := registry.LogRegistry.Get(c.Format)
if err != nil {
errs = append(errs, field.Invalid(fldPath.Child("format"), c.Format, "Unsupported log format")) errs = append(errs, field.Invalid(fldPath.Child("format"), c.Format, "Unsupported log format"))
} }
// Currently nothing to validate for c.Options.
return errs return errs
} }

View File

@ -310,7 +310,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
*out = make([]string, len(*in)) *out = make([]string, len(*in))
copy(*out, *in) copy(*out, *in)
} }
out.Logging = in.Logging in.Logging.DeepCopyInto(&out.Logging)
if in.EnableSystemLogHandler != nil { if in.EnableSystemLogHandler != nil {
in, out := &in.EnableSystemLogHandler, &out.EnableSystemLogHandler in, out := &in.EnableSystemLogHandler, &out.EnableSystemLogHandler
*out = new(bool) *out = new(bool)