Merge pull request #106594 from pohly/log-benchmark

logs: add benchmark
This commit is contained in:
Kubernetes Prow Robot 2022-01-12 18:01:08 -08:00 committed by GitHub
commit dad0c48959
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 998 additions and 25 deletions

1
go.mod
View File

@ -83,6 +83,7 @@ require (
go.opentelemetry.io/otel/sdk v0.20.0 go.opentelemetry.io/otel/sdk v0.20.0
go.opentelemetry.io/otel/trace v0.20.0 go.opentelemetry.io/otel/trace v0.20.0
go.opentelemetry.io/proto/otlp v0.7.0 go.opentelemetry.io/proto/otlp v0.7.0
go.uber.org/zap v1.19.0
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 golang.org/x/crypto v0.0.0-20210817164053-32db794688a5
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 golang.org/x/net v0.0.0-20211209124913-491a49abca63

View File

@ -17,6 +17,7 @@ limitations under the License.
package logs package logs
import ( import (
"io"
"os" "os"
"time" "time"
@ -36,8 +37,20 @@ var (
// NewJSONLogger creates a new json logr.Logger and its associated // NewJSONLogger creates a new json logr.Logger and its associated
// flush function. The separate error stream is optional and may be nil. // flush function. The separate error stream is optional and may be nil.
func NewJSONLogger(infoStream, errorStream zapcore.WriteSyncer) (logr.Logger, func()) { // The encoder config is also optional.
encoder := zapcore.NewJSONEncoder(encoderConfig) func NewJSONLogger(infoStream, errorStream zapcore.WriteSyncer, encoderConfig *zapcore.EncoderConfig) (logr.Logger, func()) {
if encoderConfig == nil {
encoderConfig = &zapcore.EncoderConfig{
MessageKey: "msg",
CallerKey: "caller",
TimeKey: "ts",
EncodeTime: epochMillisTimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}
}
encoder := zapcore.NewJSONEncoder(*encoderConfig)
var core zapcore.Core var core zapcore.Core
if errorStream == nil { if errorStream == nil {
core = zapcore.NewCore(encoder, infoStream, zapcore.Level(-127)) core = zapcore.NewCore(encoder, infoStream, zapcore.Level(-127))
@ -59,15 +72,6 @@ func NewJSONLogger(infoStream, errorStream zapcore.WriteSyncer) (logr.Logger, fu
} }
} }
var encoderConfig = zapcore.EncoderConfig{
MessageKey: "msg",
CallerKey: "caller",
TimeKey: "ts",
EncodeTime: epochMillisTimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}
func epochMillisTimeEncoder(_ time.Time, enc zapcore.PrimitiveArrayEncoder) { func epochMillisTimeEncoder(_ time.Time, enc zapcore.PrimitiveArrayEncoder) {
nanos := timeNow().UnixNano() nanos := timeNow().UnixNano()
millis := float64(nanos) / float64(time.Millisecond) millis := float64(nanos) / float64(time.Millisecond)
@ -80,9 +84,17 @@ type Factory struct{}
var _ registry.LogFormatFactory = Factory{} var _ registry.LogFormatFactory = Factory{}
func (f Factory) Create(options config.FormatOptions) (logr.Logger, func()) { func (f Factory) Create(options config.FormatOptions) (logr.Logger, func()) {
stderr := zapcore.Lock(os.Stderr) // We intentionally avoid all os.File.Sync calls. Output is unbuffered,
// therefore we don't need to flush, and calling the underlying fsync
// would just slow down writing.
//
// The assumption is that logging only needs to ensure that data gets
// written to the output stream before the process terminates, but
// doesn't need to worry about data not being written because of a
// system crash or powerloss.
stderr := zapcore.Lock(AddNopSync(os.Stderr))
if options.JSON.SplitStream { if options.JSON.SplitStream {
stdout := zapcore.Lock(os.Stdout) stdout := zapcore.Lock(AddNopSync(os.Stdout))
size := options.JSON.InfoBufferSize.Value() size := options.JSON.InfoBufferSize.Value()
if size > 0 { if size > 0 {
// Prevent integer overflow. // Prevent integer overflow.
@ -95,8 +107,21 @@ func (f Factory) Create(options config.FormatOptions) (logr.Logger, func()) {
} }
} }
// stdout for info messages, stderr for errors. // stdout for info messages, stderr for errors.
return NewJSONLogger(stdout, stderr) return NewJSONLogger(stdout, stderr, nil)
} }
// Write info messages and errors to stderr to prevent mixing with normal program output. // Write info messages and errors to stderr to prevent mixing with normal program output.
return NewJSONLogger(stderr, nil) return NewJSONLogger(stderr, nil, nil)
}
// AddNoSync adds a NOP Sync implementation.
func AddNopSync(writer io.Writer) zapcore.WriteSyncer {
return nopSync{Writer: writer}
}
type nopSync struct {
io.Writer
}
func (f nopSync) Sync() error {
return nil
} }

View File

@ -26,7 +26,7 @@ import (
var writer = zapcore.AddSync(&writeSyncer{}) var writer = zapcore.AddSync(&writeSyncer{})
func BenchmarkInfoLoggerInfo(b *testing.B) { func BenchmarkInfoLoggerInfo(b *testing.B) {
logger, _ := NewJSONLogger(writer, writer) logger, _ := NewJSONLogger(writer, nil, nil)
b.ResetTimer() b.ResetTimer()
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
@ -55,7 +55,7 @@ func BenchmarkInfoLoggerInfo(b *testing.B) {
} }
func BenchmarkZapLoggerError(b *testing.B) { func BenchmarkZapLoggerError(b *testing.B) {
logger, _ := NewJSONLogger(writer, writer) logger, _ := NewJSONLogger(writer, nil, nil)
b.ResetTimer() b.ResetTimer()
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
@ -85,7 +85,7 @@ func BenchmarkZapLoggerError(b *testing.B) {
} }
func BenchmarkZapLoggerV(b *testing.B) { func BenchmarkZapLoggerV(b *testing.B) {
logger, _ := NewJSONLogger(writer, writer) logger, _ := NewJSONLogger(writer, nil, nil)
b.ResetTimer() b.ResetTimer()
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {

View File

@ -64,7 +64,7 @@ func TestZapLoggerInfo(t *testing.T) {
for _, data := range testDataInfo { for _, data := range testDataInfo {
var buffer bytes.Buffer var buffer bytes.Buffer
writer := zapcore.AddSync(&buffer) writer := zapcore.AddSync(&buffer)
sampleInfoLogger, _ := NewJSONLogger(writer, nil) sampleInfoLogger, _ := NewJSONLogger(writer, nil, nil)
sampleInfoLogger.Info(data.msg, data.keysValues...) sampleInfoLogger.Info(data.msg, data.keysValues...)
logStr := buffer.String() logStr := buffer.String()
@ -94,7 +94,7 @@ func TestZapLoggerInfo(t *testing.T) {
// TestZapLoggerEnabled test ZapLogger enabled // TestZapLoggerEnabled test ZapLogger enabled
func TestZapLoggerEnabled(t *testing.T) { func TestZapLoggerEnabled(t *testing.T) {
sampleInfoLogger, _ := NewJSONLogger(nil, nil) sampleInfoLogger, _ := NewJSONLogger(nil, nil, nil)
for i := 0; i < 11; i++ { for i := 0; i < 11; i++ {
if !sampleInfoLogger.V(i).Enabled() { if !sampleInfoLogger.V(i).Enabled() {
t.Errorf("V(%d).Info should be enabled", i) t.Errorf("V(%d).Info should be enabled", i)
@ -111,7 +111,7 @@ func TestZapLoggerV(t *testing.T) {
for i := 0; i < 11; i++ { for i := 0; i < 11; i++ {
var buffer bytes.Buffer var buffer bytes.Buffer
writer := zapcore.AddSync(&buffer) writer := zapcore.AddSync(&buffer)
sampleInfoLogger, _ := NewJSONLogger(writer, nil) sampleInfoLogger, _ := NewJSONLogger(writer, nil, nil)
sampleInfoLogger.V(i).Info("test", "ns", "default", "podnum", 2, "time", time.Microsecond) sampleInfoLogger.V(i).Info("test", "ns", "default", "podnum", 2, "time", time.Microsecond)
logStr := buffer.String() logStr := buffer.String()
var v, lineNo int var v, lineNo int
@ -138,7 +138,7 @@ func TestZapLoggerError(t *testing.T) {
timeNow = func() time.Time { timeNow = func() time.Time {
return time.Date(1970, time.January, 1, 0, 0, 0, 123, time.UTC) return time.Date(1970, time.January, 1, 0, 0, 0, 123, time.UTC)
} }
sampleInfoLogger, _ := NewJSONLogger(writer, nil) sampleInfoLogger, _ := NewJSONLogger(writer, nil, nil)
sampleInfoLogger.Error(fmt.Errorf("invalid namespace:%s", "default"), "wrong namespace", "ns", "default", "podnum", 2, "time", time.Microsecond) sampleInfoLogger.Error(fmt.Errorf("invalid namespace:%s", "default"), "wrong namespace", "ns", "default", "podnum", 2, "time", time.Microsecond)
logStr := buffer.String() logStr := buffer.String()
var ts float64 var ts float64
@ -156,7 +156,7 @@ func TestZapLoggerError(t *testing.T) {
func TestZapLoggerStreams(t *testing.T) { func TestZapLoggerStreams(t *testing.T) {
var infoBuffer, errorBuffer bytes.Buffer var infoBuffer, errorBuffer bytes.Buffer
log, _ := NewJSONLogger(zapcore.AddSync(&infoBuffer), zapcore.AddSync(&errorBuffer)) log, _ := NewJSONLogger(zapcore.AddSync(&infoBuffer), zapcore.AddSync(&errorBuffer), nil)
log.Error(fmt.Errorf("some error"), "failed") log.Error(fmt.Errorf("some error"), "failed")
log.Info("hello world") log.Info("hello world")

View File

@ -239,7 +239,7 @@ func TestKlogIntegration(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
var buffer bytes.Buffer var buffer bytes.Buffer
writer := zapcore.AddSync(&buffer) writer := zapcore.AddSync(&buffer)
logger, _ := NewJSONLogger(writer, writer) logger, _ := NewJSONLogger(writer, nil, nil)
klog.SetLogger(logger) klog.SetLogger(logger)
defer klog.ClearLogger() defer klog.ClearLogger()
@ -270,7 +270,7 @@ func TestKlogIntegration(t *testing.T) {
func TestKlogV(t *testing.T) { func TestKlogV(t *testing.T) {
var buffer testBuff var buffer testBuff
writer := zapcore.AddSync(&buffer) writer := zapcore.AddSync(&buffer)
logger, _ := NewJSONLogger(writer, writer) logger, _ := NewJSONLogger(writer, nil, nil)
klog.SetLogger(logger) klog.SetLogger(logger)
defer klog.ClearLogger() defer klog.ClearLogger()
fs := flag.FlagSet{} fs := flag.FlagSet{}

View File

@ -0,0 +1,11 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- sig-instrumentation-approvers
- serathius
- pohly
reviewers:
- sig-instrumentation-reviewers
labels:
- sig/instrumentation
- wg/structured-logging

View File

@ -0,0 +1,43 @@
# Benchmarking logging
Any major changes to the logging code, whether it is in Kubernetes or in klog,
must be benchmarked before and after the change.
## Running the benchmark
```
$ go test -bench=. -test.benchmem -benchmem .
```
## Real log data
The files under `data` define test cases for specific aspects of formatting. To
test with a log file that represents output under some kind of real load, copy
the log file into `data/<file name>.log` and run benchmarking as described
above. `-bench=BenchmarkLogging/<file name without .log suffix>` can be used
to benchmark just the new file.
When using `data/v<some number>/<file name>.log`, formatting will be done at
that log level. Symlinks can be created to simulating writing of the same log
data at different levels.
No such real data is included in the Kubernetes repo because of their size.
They can be found in the "artifacts" of this
https://testgrid.kubernetes.io/sig-instrumentation-tests#kind-json-logging-master
Prow job:
- `artifacts/logs/kind-control-plane/containers`
- `artifacts/logs/kind-*/kubelet.log`
With sufficient credentials, `gsutil` can be used to download everything for a job with:
```
gsutil -m cp -R gs://kubernetes-jenkins/logs/ci-kubernetes-kind-e2e-json-logging/<job ID> .
```
## Analyzing log data
While loading a file, some statistics about it are collected. Those are shown
when running with:
```
$ go test -v -bench=. -test.benchmem -benchmem .
```

View File

@ -0,0 +1,296 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package benchmark
import (
"errors"
"flag"
"fmt"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/go-logr/logr"
"k8s.io/component-base/logs"
logsjson "k8s.io/component-base/logs/json"
"k8s.io/klog/v2"
)
func BenchmarkEncoding(b *testing.B) {
// Each "data/(v[0-9]/)?*.log" file is expected to contain JSON log
// messages. We generate one sub-benchmark for each file where logging
// is tested with the log level from the directory. Symlinks can be
// used to test the same file with different levels.
if err := filepath.Walk("data", func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if !strings.HasSuffix(path, ".log") {
return nil
}
messages, stats, err := loadLog(path)
if err != nil {
return err
}
if info.Mode()&fs.ModeSymlink == 0 {
b.Log(path + "\n" + stats.String())
}
b.Run(strings.TrimSuffix(strings.TrimPrefix(path, "data/"), ".log"), func(b *testing.B) {
// Take verbosity threshold from directory, if present.
vMatch := regexp.MustCompile(`/v(\d+)/`).FindStringSubmatch(path)
v := 0
if vMatch != nil {
v, _ = strconv.Atoi(vMatch[1])
}
fileSizes := map[string]int{}
b.Run("stats", func(b *testing.B) {
// Nothing to do. Use this for "go test -v
// -bench=BenchmarkLogging/.*/stats" to print
// just the statistics.
})
b.Run("printf", func(b *testing.B) {
b.ResetTimer()
output = 0
for i := 0; i < b.N; i++ {
for _, item := range messages {
if item.verbosity <= v {
printf(item)
}
}
}
fileSizes["printf"] = int(output) / b.N
})
b.Run("structured", func(b *testing.B) {
b.ResetTimer()
output = 0
for i := 0; i < b.N; i++ {
for _, item := range messages {
if item.verbosity <= v {
prints(item)
}
}
}
fileSizes["structured"] = int(output) / b.N
})
b.Run("JSON", func(b *testing.B) {
klog.SetLogger(jsonLogger)
defer klog.ClearLogger()
b.ResetTimer()
output = 0
for i := 0; i < b.N; i++ {
for _, item := range messages {
if item.verbosity <= v {
prints(item)
}
}
}
fileSizes["JSON"] = int(output) / b.N
})
b.Log(fmt.Sprintf("file sizes: %v\n", fileSizes))
})
return nil
}); err != nil {
b.Fatalf("reading 'data' directory: %v", err)
}
}
type loadGeneratorConfig struct {
// Length of the message written in each log entry.
messageLength int
// Percentage of error log entries written.
errorPercentage float64
// Number of concurrent goroutines that generate log entries.
workers int
}
// BenchmarkWriting simulates writing of a stream which mixes info and error log
// messages at a certain ratio. In contrast to BenchmarkEncoding, this stresses
// the output handling and includes the usual additional information (caller,
// time stamp).
//
// See https://github.com/kubernetes/kubernetes/issues/107029 for the
// motivation.
func BenchmarkWriting(b *testing.B) {
flag.Set("skip_headers", "false")
defer flag.Set("skip_headers", "true")
// This could be made configurable and/or we could benchmark different
// configurations automatically.
config := loadGeneratorConfig{
messageLength: 300,
errorPercentage: 1.0,
workers: 100,
}
benchmarkWriting(b, config)
}
func benchmarkWriting(b *testing.B, config loadGeneratorConfig) {
b.Run("discard", func(b *testing.B) {
benchmarkOutputFormats(b, config, true)
})
b.Run("tmp-files", func(b *testing.B) {
benchmarkOutputFormats(b, config, false)
})
}
func benchmarkOutputFormats(b *testing.B, config loadGeneratorConfig, discard bool) {
tmpDir := b.TempDir()
b.Run("structured", func(b *testing.B) {
var out *os.File
if !discard {
var err error
out, err = os.Create(path.Join(tmpDir, "all.log"))
if err != nil {
b.Fatal(err)
}
klog.SetOutput(out)
defer klog.SetOutput(&output)
}
generateOutput(b, config, nil, out)
})
b.Run("JSON", func(b *testing.B) {
var logger logr.Logger
var flush func()
var out1, out2 *os.File
if !discard {
var err error
out1, err = os.Create(path.Join(tmpDir, "stream-1.log"))
if err != nil {
b.Fatal(err)
}
defer out1.Close()
out2, err = os.Create(path.Join(tmpDir, "stream-2.log"))
if err != nil {
b.Fatal(err)
}
defer out2.Close()
}
b.Run("single-stream", func(b *testing.B) {
if discard {
logger, flush = logsjson.NewJSONLogger(logsjson.AddNopSync(&output), nil, nil)
} else {
stderr := os.Stderr
os.Stderr = out1
defer func() {
os.Stderr = stderr
}()
options := logs.NewOptions()
logger, flush = logsjson.Factory{}.Create(options.Config.Options)
}
klog.SetLogger(logger)
defer klog.ClearLogger()
generateOutput(b, config, flush, out1)
})
b.Run("split-stream", func(b *testing.B) {
if discard {
logger, flush = logsjson.NewJSONLogger(logsjson.AddNopSync(&output), logsjson.AddNopSync(&output), nil)
} else {
stdout, stderr := os.Stdout, os.Stderr
os.Stdout, os.Stderr = out1, out2
defer func() {
os.Stdout, os.Stderr = stdout, stderr
}()
options := logs.NewOptions()
options.Config.Options.JSON.SplitStream = true
logger, flush = logsjson.Factory{}.Create(options.Config.Options)
}
klog.SetLogger(logger)
defer klog.ClearLogger()
generateOutput(b, config, flush, out1, out2)
})
})
}
func generateOutput(b *testing.B, config loadGeneratorConfig, flush func(), files ...*os.File) {
msg := strings.Repeat("X", config.messageLength)
err := errors.New("fail")
start := time.Now()
// Scale by 1000 because "go test -bench" starts with b.N == 1, which is very low.
n := b.N * 1000
b.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < config.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
acc := 0.0
for i := 0; i < n; i++ {
if acc > 100 {
klog.ErrorS(err, msg, "key", "value")
acc -= 100
} else {
klog.InfoS(msg, "key", "value")
}
acc += config.errorPercentage
}
}()
}
wg.Wait()
klog.Flush()
if flush != nil {
flush()
}
b.StopTimer()
// Print some information about the result.
end := time.Now()
duration := end.Sub(start)
total := n * config.workers
b.Logf("Wrote %d log entries in %s -> %.1f/s", total, duration, float64(total)/duration.Seconds())
for i, file := range files {
if file != nil {
pos, err := file.Seek(0, os.SEEK_END)
if err != nil {
b.Fatal(err)
}
if _, err := file.Seek(0, os.SEEK_SET); err != nil {
b.Fatal(err)
}
max := 50
buffer := make([]byte, max)
actual, err := file.Read(buffer)
if err != nil {
if err != io.EOF {
b.Fatal(err)
}
buffer = nil
}
if actual == max {
buffer[max-3] = '.'
buffer[max-2] = '.'
buffer[max-1] = '.'
}
b.Logf(" %d bytes to file #%d -> %.1fMiB/s (starts with: %s)", pos, i, float64(pos)/duration.Seconds()/1024/1024, string(buffer))
}
}
}

View File

@ -0,0 +1,88 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package benchmark
import (
"flag"
"io"
"github.com/go-logr/logr"
"go.uber.org/zap/zapcore"
logsjson "k8s.io/component-base/logs/json"
"k8s.io/klog/v2"
)
func init() {
// Cause all klog output to be discarded with minimal overhead.
// We don't include time stamps and caller information.
// Individual tests can change that by calling flag.Set again,
// but should always restore this state here.
klog.InitFlags(nil)
flag.Set("alsologtostderr", "false")
flag.Set("logtostderr", "false")
flag.Set("skip_headers", "true")
flag.Set("one_output", "true")
flag.Set("stderrthreshold", "FATAL")
klog.SetOutput(&output)
}
type bytesWritten int64
func (b *bytesWritten) Write(data []byte) (int, error) {
l := len(data)
*b += bytesWritten(l)
return l, nil
}
func (b *bytesWritten) Sync() error {
return nil
}
var output bytesWritten
var jsonLogger = newJSONLogger(&output)
func newJSONLogger(out io.Writer) logr.Logger {
encoderConfig := &zapcore.EncoderConfig{
MessageKey: "msg",
}
logger, _ := logsjson.NewJSONLogger(zapcore.AddSync(out), nil, encoderConfig)
return logger
}
func printf(item logMessage) {
if item.isError {
klog.Errorf("%s: %v %s", item.msg, item.err, item.kvs)
} else {
klog.Infof("%s: %v", item.msg, item.kvs)
}
}
// These variables are a workaround for logcheck complaining about the dynamic
// parameters.
var (
errorS = klog.ErrorS
infoS = klog.InfoS
)
func prints(item logMessage) {
if item.isError {
errorS(item.err, item.msg, item.kvs...)
} else {
infoS(item.msg, item.kvs...)
}
}

View File

@ -0,0 +1,2 @@
# This is a manually created message. See https://github.com/kubernetes/kubernetes/issues/106652 for the real one.
Nov 19 02:13:48 kind-worker2 kubelet[250]: {"ts":1637288028968.0125,"caller":"kuberuntime/kuberuntime_manager.go:902","msg":"Creating container in pod","v":0,"container":{"Name":"terminate-cmd-rpn","Image":"k8s.gcr.io/e2e-test-images/busybox:1.29-2","Command":["sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"],"TerminationMessagePath":"/dev/termination-log"}}

View File

@ -0,0 +1 @@
{"v":0, "msg": "Pod status update", "err": "failed"}

View File

@ -0,0 +1 @@
{"msg": "Pod status update", "err": "failed"}

View File

@ -0,0 +1 @@
{"v": 0, "msg": "Pod status updated"}

View File

@ -0,0 +1 @@
{"v":0, "msg": "Example", "someValue": 1, "someString": "hello world", "pod": {"namespace": "system", "name": "kube-scheduler"}, "pv": {"name": "volume"}}

View File

@ -0,0 +1,269 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package benchmark
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"reflect"
"regexp"
"sort"
"strings"
"text/template"
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
)
type logMessage struct {
msg string
verbosity int
err error
isError bool
kvs []interface{}
}
const (
stringArg = "string"
multiLineStringArg = "multiLineString"
objectStringArg = "objectString"
numberArg = "number"
krefArg = "kref"
otherArg = "other"
totalArg = "total"
)
type logStats struct {
TotalLines, JsonLines, ErrorMessages int
ArgCounts map[string]int
OtherLines []string
OtherArgs []interface{}
MultiLineArgs [][]string
ObjectTypes map[string]int
}
var (
logStatsTemplate = template.Must(template.New("format").Funcs(template.FuncMap{
"percent": func(x, y int) string {
if y == 0 {
return "NA"
}
return fmt.Sprintf("%d%%", x*100/y)
},
"sub": func(x, y int) int {
return x - y
},
}).Parse(`Total number of lines: {{.TotalLines}}
Valid JSON messages: {{.JsonLines}} ({{percent .JsonLines .TotalLines}} of total lines)
Error messages: {{.ErrorMessages}} ({{percent .ErrorMessages .JsonLines}} of valid JSON messages)
Unrecognized lines: {{sub .TotalLines .JsonLines}}
{{range .OtherLines}} {{.}}
{{end}}
Args:
total: {{if .ArgCounts.total}}{{.ArgCounts.total}}{{else}}0{{end}}{{if .ArgCounts.string}}
strings: {{.ArgCounts.string}} ({{percent .ArgCounts.string .ArgCounts.total}}){{end}} {{if .ArgCounts.multiLineString}}
with line breaks: {{.ArgCounts.multiLineString}} ({{percent .ArgCounts.multiLineString .ArgCounts.total}} of all arguments)
{{range .MultiLineArgs}} ===== {{index . 0}} =====
{{index . 1}}
{{end}}{{end}}{{if .ArgCounts.objectString}}
with API objects: {{.ArgCounts.objectString}} ({{percent .ArgCounts.objectString .ArgCounts.total}} of all arguments)
types and their number of usage:{{range $key, $value := .ObjectTypes}} {{ $key }}:{{ $value }}{{end}}{{end}}{{if .ArgCounts.number}}
numbers: {{.ArgCounts.number}} ({{percent .ArgCounts.number .ArgCounts.total}}){{end}}{{if .ArgCounts.kref}}
ObjectRef: {{.ArgCounts.kref}} ({{percent .ArgCounts.kref .ArgCounts.total}}){{end}}{{if .ArgCounts.other}}
others: {{.ArgCounts.other}} ({{percent .ArgCounts.other .ArgCounts.total}}){{end}}
`))
)
// This produces too much output:
// {{range .OtherArgs}} {{.}}
// {{end}}
// Doesn't work?
// Unrecognized lines: {{with $delta := sub .TotalLines .JsonLines}}{{$delta}} ({{percent $delta .TotalLines}} of total lines){{end}}
func (s logStats) String() string {
var buffer bytes.Buffer
err := logStatsTemplate.Execute(&buffer, &s)
if err != nil {
return err.Error()
}
return buffer.String()
}
func loadLog(path string) (messages []logMessage, stats logStats, err error) {
file, err := os.Open(path)
if err != nil {
return nil, logStats{}, err
}
defer file.Close()
stats.ArgCounts = map[string]int{}
scanner := bufio.NewScanner(file)
for lineNo := 0; scanner.Scan(); lineNo++ {
line := scanner.Bytes()
msg, err := parseLine(line, &stats)
if err != nil {
stats.OtherLines = append(stats.OtherLines, fmt.Sprintf("%d: %s", lineNo, string(line)))
continue
}
messages = append(messages, msg)
}
if err := scanner.Err(); err != nil {
return nil, logStats{}, fmt.Errorf("reading %s failed: %v", path, err)
}
return
}
// systemd prefix:
// Nov 19 02:08:51 kind-worker2 kubelet[250]: {"ts":1637287731687.8315,...
//
// kubectl (?) prefix:
// 2021-11-19T02:08:28.475825534Z stderr F {"ts": ...
var prefixRE = regexp.MustCompile(`^\w+ \d+ \S+ \S+ \S+: |\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z stderr . `)
// String format for API structs from generated.pb.go.
// &Container{...}
var objectRE = regexp.MustCompile(`^&([a-zA-Z]*)\{`)
func parseLine(line []byte, stats *logStats) (item logMessage, err error) {
stats.TotalLines++
line = prefixRE.ReplaceAll(line, nil)
content := map[string]interface{}{}
if err := json.Unmarshal(line, &content); err != nil {
return logMessage{}, fmt.Errorf("JSON parsing failed: %v", err)
}
stats.JsonLines++
kvs := map[string]interface{}{}
item.isError = true
for key, value := range content {
switch key {
case "v":
verbosity, ok := value.(float64)
if !ok {
return logMessage{}, fmt.Errorf("expected number for v, got: %T %v", value, value)
}
item.verbosity = int(verbosity)
item.isError = false
case "msg":
msg, ok := value.(string)
if !ok {
return logMessage{}, fmt.Errorf("expected string for msg, got: %T %v", value, value)
}
item.msg = msg
case "ts", "caller":
// ignore
case "err":
errStr, ok := value.(string)
if !ok {
return logMessage{}, fmt.Errorf("expected string for err, got: %T %v", value, value)
}
item.err = errors.New(errStr)
stats.ArgCounts[stringArg]++
stats.ArgCounts[totalArg]++
default:
if obj := toObject(value); obj != nil {
value = obj
}
switch value := value.(type) {
case string:
stats.ArgCounts[stringArg]++
if strings.Contains(value, "\n") {
stats.ArgCounts[multiLineStringArg]++
stats.MultiLineArgs = append(stats.MultiLineArgs, []string{key, value})
}
match := objectRE.FindStringSubmatch(value)
if match != nil {
if stats.ObjectTypes == nil {
stats.ObjectTypes = map[string]int{}
}
stats.ArgCounts[objectStringArg]++
stats.ObjectTypes[match[1]]++
}
case float64:
stats.ArgCounts[numberArg]++
case klog.ObjectRef:
stats.ArgCounts[krefArg]++
default:
stats.ArgCounts[otherArg]++
stats.OtherArgs = append(stats.OtherArgs, value)
}
stats.ArgCounts[totalArg]++
kvs[key] = value
}
}
// Sort by key.
var keys []string
for key := range kvs {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
item.kvs = append(item.kvs, key, kvs[key])
}
if !item.isError && item.err != nil {
// Error is a normal key/value.
item.kvs = append(item.kvs, "err", item.err)
item.err = nil
}
if item.isError {
stats.ErrorMessages++
}
return
}
// This is a list of objects that might have been dumped. The simple ones must
// come first because unmarshaling will try one after the after and an
// ObjectRef would unmarshal fine into any of the others whereas any of the
// other types hopefully have enough extra fields that they won't fit (unknown
// fields are an error).
var objectTypes = []reflect.Type{
reflect.TypeOf(klog.ObjectRef{}),
reflect.TypeOf(&v1.Pod{}),
reflect.TypeOf(&v1.Container{}),
}
func toObject(value interface{}) interface{} {
data, ok := value.(map[string]interface{})
if !ok {
return nil
}
jsonData, err := json.Marshal(data)
if err != nil {
return nil
}
for _, t := range objectTypes {
obj := reflect.New(t)
decoder := json.NewDecoder(bytes.NewBuffer(jsonData))
decoder.DisallowUnknownFields()
if err := decoder.Decode(obj.Interface()); err == nil {
return reflect.Indirect(obj).Interface()
}
}
return nil
}

View File

@ -0,0 +1,233 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package benchmark
import (
"bytes"
"errors"
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)
func TestData(t *testing.T) {
container := v1.Container{
Command: []string{"sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"},
Image: "k8s.gcr.io/e2e-test-images/busybox:1.29-2",
Name: "terminate-cmd-rpn",
TerminationMessagePath: "/dev/termination-log",
}
testcases := map[string]struct {
messages []logMessage
printf, structured, json string
stats logStats
}{
"data/simple.log": {
messages: []logMessage{
{
msg: "Pod status updated",
},
},
printf: `Pod status updated: []
`,
structured: `"Pod status updated"
`,
json: `{"msg":"Pod status updated","v":0}
`,
stats: logStats{
TotalLines: 1,
JsonLines: 1,
ArgCounts: map[string]int{},
},
},
"data/error.log": {
messages: []logMessage{
{
msg: "Pod status update",
err: errors.New("failed"),
isError: true,
},
},
printf: `Pod status update: failed []
`,
structured: `"Pod status update" err="failed"
`,
json: `{"msg":"Pod status update","err":"failed"}
`,
stats: logStats{
TotalLines: 1,
JsonLines: 1,
ErrorMessages: 1,
ArgCounts: map[string]int{
stringArg: 1,
totalArg: 1,
},
},
},
"data/error-value.log": {
messages: []logMessage{
{
msg: "Pod status update",
kvs: []interface{}{"err", errors.New("failed")},
},
},
printf: `Pod status update: [err failed]
`,
structured: `"Pod status update" err="failed"
`,
json: `{"msg":"Pod status update","v":0,"err":"failed"}
`,
stats: logStats{
TotalLines: 1,
JsonLines: 1,
ArgCounts: map[string]int{
stringArg: 1,
totalArg: 1,
},
},
},
"data/values.log": {
messages: []logMessage{
{
msg: "Example",
kvs: []interface{}{
"pod", klog.KRef("system", "kube-scheduler"),
"pv", klog.KRef("", "volume"),
"someString", "hello world",
"someValue", 1.0,
},
},
},
printf: `Example: [pod system/kube-scheduler pv volume someString hello world someValue 1]
`,
structured: `"Example" pod="system/kube-scheduler" pv="volume" someString="hello world" someValue=1
`,
json: `{"msg":"Example","v":0,"pod":{"name":"kube-scheduler","namespace":"system"},"pv":{"name":"volume"},"someString":"hello world","someValue":1}
`,
stats: logStats{
TotalLines: 1,
JsonLines: 1,
ArgCounts: map[string]int{
stringArg: 1,
krefArg: 2,
numberArg: 1,
totalArg: 4,
},
},
},
"data/container.log": {
messages: []logMessage{
{
msg: "Creating container in pod",
kvs: []interface{}{
"container", &container,
},
},
},
printf: `Creating container in pod: [container &Container{Name:terminate-cmd-rpn,Image:k8s.gcr.io/e2e-test-images/busybox:1.29-2,Command:[sh -c
f=/restart-count/restartCount
count=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})
if [ $count -eq 1 ]; then
exit 1
fi
if [ $count -eq 2 ]; then
exit 0
fi
while true; do sleep 1; done
],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,}]
`,
structured: `"Creating container in pod" container=<
&Container{Name:terminate-cmd-rpn,Image:k8s.gcr.io/e2e-test-images/busybox:1.29-2,Command:[sh -c
f=/restart-count/restartCount
count=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})
if [ $count -eq 1 ]; then
exit 1
fi
if [ $count -eq 2 ]; then
exit 0
fi
while true; do sleep 1; done
],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,}
>
`,
// This is what the output would look like with JSON object. Because of https://github.com/kubernetes/kubernetes/issues/106652 we get the string instead.
// json: `{"msg":"Creating container in pod","v":0,"container":{"name":"terminate-cmd-rpn","image":"k8s.gcr.io/e2e-test-images/busybox:1.29-2","command":["sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"],"resources":{},"terminationMessagePath":"/dev/termination-log"}}
// `,
json: `{"msg":"Creating container in pod","v":0,"container":"&Container{Name:terminate-cmd-rpn,Image:k8s.gcr.io/e2e-test-images/busybox:1.29-2,Command:[sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,}"}
`,
stats: logStats{
TotalLines: 2,
JsonLines: 1,
ArgCounts: map[string]int{
totalArg: 1,
otherArg: 1,
},
OtherLines: []string{
"0: # This is a manually created message. See https://github.com/kubernetes/kubernetes/issues/106652 for the real one.",
},
OtherArgs: []interface{}{
&container,
},
},
},
}
for path, expected := range testcases {
t.Run(path, func(t *testing.T) {
messages, stats, err := loadLog(path)
if err != nil {
t.Fatalf("unexpected load error: %v", err)
}
assert.Equal(t, expected.messages, messages)
assert.Equal(t, expected.stats, stats)
print := func(format func(item logMessage)) {
for _, item := range expected.messages {
format(item)
}
}
testBuffered := func(t *testing.T, expected string, format func(item logMessage)) {
var buffer bytes.Buffer
klog.SetOutput(&buffer)
defer klog.SetOutput(&output)
print(format)
klog.Flush()
assert.Equal(t, expected, buffer.String())
}
t.Run("printf", func(t *testing.T) {
testBuffered(t, expected.printf, printf)
})
t.Run("structured", func(t *testing.T) {
testBuffered(t, expected.structured, prints)
})
t.Run("json", func(t *testing.T) {
var buffer bytes.Buffer
logger := newJSONLogger(&buffer)
klog.SetLogger(logger)
defer klog.ClearLogger()
print(prints)
klog.Flush()
assert.Equal(t, expected.json, buffer.String())
})
})
}
}

1
vendor/modules.txt vendored
View File

@ -933,6 +933,7 @@ go.uber.org/atomic
# go.uber.org/multierr v1.6.0 => go.uber.org/multierr v1.6.0 # go.uber.org/multierr v1.6.0 => go.uber.org/multierr v1.6.0
go.uber.org/multierr go.uber.org/multierr
# go.uber.org/zap v1.19.0 => go.uber.org/zap v1.19.0 # go.uber.org/zap v1.19.0 => go.uber.org/zap v1.19.0
## explicit
go.uber.org/zap go.uber.org/zap
go.uber.org/zap/buffer go.uber.org/zap/buffer
go.uber.org/zap/internal/bufferpool go.uber.org/zap/internal/bufferpool