mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 23:15:14 +00:00
Merge pull request #115358 from pohly/logs-performance-benchmarks
Logs performance benchmarks
This commit is contained in:
commit
7ec3c2727b
10
test/integration/logs/benchmark/.gitignore
vendored
Normal file
10
test/integration/logs/benchmark/.gitignore
vendored
Normal file
@ -0,0 +1,10 @@
|
||||
# Created by get-logs.sh:
|
||||
/ci-kubernetes-kind-e2e-json-logging
|
||||
/data/kind-worker-kubelet.log
|
||||
/data/kube-apiserver.log
|
||||
/data/kube-controller-manager.log
|
||||
/data/kube-scheduler.log
|
||||
/data/v3/kind-worker-kubelet.log
|
||||
/data/v3/kube-apiserver.log
|
||||
/data/v3/kube-controller-manager.log
|
||||
/data/v3/kube-scheduler.log
|
@ -6,7 +6,7 @@ must be benchmarked before and after the change.
|
||||
## Running the benchmark
|
||||
|
||||
```
|
||||
$ go test -bench=. -test.benchmem -benchmem .
|
||||
go test -v -bench=. -benchmem -benchtime=10s .
|
||||
```
|
||||
|
||||
## Real log data
|
||||
@ -28,9 +28,29 @@ Prow job:
|
||||
- `artifacts/logs/kind-control-plane/containers`
|
||||
- `artifacts/logs/kind-*/kubelet.log`
|
||||
|
||||
With sufficient credentials, `gsutil` can be used to download everything for a job with:
|
||||
With sufficient credentials, `gsutil` can be used to download everything for a job directly
|
||||
into a directory that then will be used by the benchmarks automatically:
|
||||
|
||||
```
|
||||
gsutil -m cp -R gs://kubernetes-jenkins/logs/ci-kubernetes-kind-e2e-json-logging/<job ID> .
|
||||
kubernetes$ test/integration/logs/benchmark/get-logs.sh
|
||||
++ dirname test/integration/logs/benchmark/get-logs.sh
|
||||
+ cd test/integration/logs/benchmark
|
||||
++ latest_job
|
||||
++ gsutil cat gs://kubernetes-jenkins/logs/ci-kubernetes-kind-e2e-json-logging/latest-build.txt
|
||||
+ job=1618864842834186240
|
||||
+ rm -rf ci-kubernetes-kind-e2e-json-logging
|
||||
+ mkdir ci-kubernetes-kind-e2e-json-logging
|
||||
...
|
||||
```
|
||||
|
||||
This sets up the `data` directory so that additional test cases are available
|
||||
(`BenchmarkEncoding/v3/kind-worker-kubelet/`,
|
||||
`BenchmarkEncoding/kube-scheduler/`, etc.).
|
||||
|
||||
|
||||
To clean up, use
|
||||
```
|
||||
git clean -fx test/integration/logs/benchmark
|
||||
```
|
||||
|
||||
## Analyzing log data
|
||||
@ -39,5 +59,5 @@ While loading a file, some statistics about it are collected. Those are shown
|
||||
when running with:
|
||||
|
||||
```
|
||||
$ go test -v -bench=. -test.benchmem -benchmem .
|
||||
go test -v -bench=BenchmarkEncoding/none -run=none .
|
||||
```
|
||||
|
@ -18,12 +18,10 @@ package benchmark
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
@ -32,16 +30,18 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/component-base/featuregate"
|
||||
logsapi "k8s.io/component-base/logs/api/v1"
|
||||
logsjson "k8s.io/component-base/logs/json"
|
||||
_ "k8s.io/component-base/logs/json/register"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
func BenchmarkEncoding(b *testing.B) {
|
||||
seen := map[string]bool{}
|
||||
|
||||
// Each "data/(v[0-9]/)?*.log" file is expected to contain JSON log
|
||||
// messages. We generate one sub-benchmark for each file where logging
|
||||
// is tested with the log level from the directory. Symlinks can be
|
||||
// used to test the same file with different levels.
|
||||
// is tested with the log level from the directory.
|
||||
if err := filepath.Walk("data", func(path string, info fs.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
@ -53,8 +53,14 @@ func BenchmarkEncoding(b *testing.B) {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.Mode()&fs.ModeSymlink == 0 {
|
||||
b.Log(path + "\n" + stats.String())
|
||||
// Only print unique file statistics. They get shown for the
|
||||
// first file where new statistics are encountered. The
|
||||
// assumption here is that the there are no files with
|
||||
// different content and exactly the same statistics.
|
||||
statsStr := stats.String()
|
||||
if !seen[statsStr] {
|
||||
b.Log(path + "\n" + statsStr)
|
||||
seen[statsStr] = true
|
||||
}
|
||||
b.Run(strings.TrimSuffix(strings.TrimPrefix(path, "data/"), ".log"), func(b *testing.B) {
|
||||
// Take verbosity threshold from directory, if present.
|
||||
@ -63,52 +69,57 @@ func BenchmarkEncoding(b *testing.B) {
|
||||
if vMatch != nil {
|
||||
v, _ = strconv.Atoi(vMatch[1])
|
||||
}
|
||||
|
||||
fileSizes := map[string]int{}
|
||||
b.Run("stats", func(b *testing.B) {
|
||||
// Nothing to do. Use this for "go test -v
|
||||
// -bench=BenchmarkLogging/.*/stats" to print
|
||||
// just the statistics.
|
||||
})
|
||||
b.Run("printf", func(b *testing.B) {
|
||||
test := func(b *testing.B, format string, print func(logger klog.Logger, item logMessage)) {
|
||||
state := klog.CaptureState()
|
||||
defer state.Restore()
|
||||
|
||||
var output bytesWritten
|
||||
c := logsapi.NewLoggingConfiguration()
|
||||
c.Format = format
|
||||
o := logsapi.LoggingOptions{
|
||||
ErrorStream: &output,
|
||||
InfoStream: &output,
|
||||
}
|
||||
klog.SetOutput(&output)
|
||||
if err := logsapi.ValidateAndApplyWithOptions(c, &o, nil); err != nil {
|
||||
b.Fatalf("Unexpected error configuring logging: %v", err)
|
||||
}
|
||||
logger := klog.Background()
|
||||
b.ResetTimer()
|
||||
output = 0
|
||||
start := time.Now()
|
||||
total := int64(0)
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, item := range messages {
|
||||
if item.verbosity <= v {
|
||||
printf(item)
|
||||
print(logger, item)
|
||||
total++
|
||||
}
|
||||
}
|
||||
}
|
||||
fileSizes["printf"] = int(output) / b.N
|
||||
end := time.Now()
|
||||
duration := end.Sub(start)
|
||||
|
||||
// Report messages/s instead of ns/op because "op" varies.
|
||||
b.ReportMetric(0, "ns/op")
|
||||
b.ReportMetric(float64(total)/duration.Seconds(), "msgs/s")
|
||||
fileSizes[filepath.Base(b.Name())] = int(output)
|
||||
}
|
||||
|
||||
b.Run("printf", func(b *testing.B) {
|
||||
test(b, "text", func(_ klog.Logger, item logMessage) {
|
||||
printf(item)
|
||||
})
|
||||
})
|
||||
b.Run("structured", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
output = 0
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, item := range messages {
|
||||
if item.verbosity <= v {
|
||||
prints(item)
|
||||
}
|
||||
}
|
||||
}
|
||||
fileSizes["structured"] = int(output) / b.N
|
||||
test(b, "text", prints)
|
||||
})
|
||||
b.Run("JSON", func(b *testing.B) {
|
||||
klog.SetLogger(jsonLogger)
|
||||
defer klog.ClearLogger()
|
||||
b.ResetTimer()
|
||||
output = 0
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, item := range messages {
|
||||
if item.verbosity <= v {
|
||||
prints(item)
|
||||
}
|
||||
}
|
||||
}
|
||||
fileSizes["JSON"] = int(output) / b.N
|
||||
test(b, "json", prints)
|
||||
})
|
||||
|
||||
b.Log(fmt.Sprintf("file sizes: %v\n", fileSizes))
|
||||
b.Log(fmt.Sprintf("%s: file sizes: %v\n", path, fileSizes))
|
||||
})
|
||||
return nil
|
||||
}); err != nil {
|
||||
@ -135,9 +146,6 @@ type loadGeneratorConfig struct {
|
||||
// See https://github.com/kubernetes/kubernetes/issues/107029 for the
|
||||
// motivation.
|
||||
func BenchmarkWriting(b *testing.B) {
|
||||
flag.Set("skip_headers", "false")
|
||||
defer flag.Set("skip_headers", "true")
|
||||
|
||||
// This could be made configurable and/or we could benchmark different
|
||||
// configurations automatically.
|
||||
config := loadGeneratorConfig{
|
||||
@ -159,70 +167,92 @@ func benchmarkWriting(b *testing.B, config loadGeneratorConfig) {
|
||||
}
|
||||
|
||||
func benchmarkOutputFormats(b *testing.B, config loadGeneratorConfig, discard bool) {
|
||||
tmpDir := b.TempDir()
|
||||
b.Run("structured", func(b *testing.B) {
|
||||
var out *os.File
|
||||
if !discard {
|
||||
var err error
|
||||
out, err = os.Create(path.Join(tmpDir, "all.log"))
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
klog.SetOutput(out)
|
||||
defer klog.SetOutput(&output)
|
||||
}
|
||||
generateOutput(b, config, nil, out)
|
||||
benchmarkOutputFormat(b, config, discard, "text")
|
||||
})
|
||||
b.Run("JSON", func(b *testing.B) {
|
||||
var out1, out2 *os.File
|
||||
if !discard {
|
||||
var err error
|
||||
out1, err = os.Create(path.Join(tmpDir, "stream-1.log"))
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
defer out1.Close()
|
||||
out2, err = os.Create(path.Join(tmpDir, "stream-2.log"))
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
defer out2.Close()
|
||||
}
|
||||
o := logsapi.LoggingOptions{}
|
||||
if discard {
|
||||
o.ErrorStream = io.Discard
|
||||
o.InfoStream = io.Discard
|
||||
} else {
|
||||
o.ErrorStream = out1
|
||||
o.InfoStream = out1
|
||||
}
|
||||
|
||||
b.Run("single-stream", func(b *testing.B) {
|
||||
c := logsapi.NewLoggingConfiguration()
|
||||
logger, control := logsjson.Factory{}.Create(*c, o)
|
||||
klog.SetLogger(logger)
|
||||
defer klog.ClearLogger()
|
||||
generateOutput(b, config, control.Flush, out1)
|
||||
})
|
||||
|
||||
b.Run("split-stream", func(b *testing.B) {
|
||||
c := logsapi.NewLoggingConfiguration()
|
||||
c.Options.JSON.SplitStream = true
|
||||
logger, control := logsjson.Factory{}.Create(*c, o)
|
||||
klog.SetLogger(logger)
|
||||
defer klog.ClearLogger()
|
||||
generateOutput(b, config, control.Flush, out1, out2)
|
||||
})
|
||||
benchmarkOutputFormat(b, config, discard, "json")
|
||||
})
|
||||
}
|
||||
|
||||
func generateOutput(b *testing.B, config loadGeneratorConfig, flush func(), files ...*os.File) {
|
||||
func benchmarkOutputFormat(b *testing.B, config loadGeneratorConfig, discard bool, format string) {
|
||||
b.Run("single-stream", func(b *testing.B) {
|
||||
benchmarkOutputFormatStream(b, config, discard, format, false)
|
||||
})
|
||||
b.Run("split-stream", func(b *testing.B) {
|
||||
benchmarkOutputFormatStream(b, config, discard, format, true)
|
||||
})
|
||||
}
|
||||
|
||||
func benchmarkOutputFormatStream(b *testing.B, config loadGeneratorConfig, discard bool, format string, splitStreams bool) {
|
||||
tmpDir := b.TempDir()
|
||||
state := klog.CaptureState()
|
||||
defer state.Restore()
|
||||
|
||||
featureGate := featuregate.NewFeatureGate()
|
||||
logsapi.AddFeatureGates(featureGate)
|
||||
if err := featureGate.SetFromMap(map[string]bool{
|
||||
string(logsapi.LoggingAlphaOptions): true,
|
||||
string(logsapi.LoggingBetaOptions): true,
|
||||
}); err != nil {
|
||||
b.Fatalf("Set feature gates: %v", err)
|
||||
}
|
||||
|
||||
// Create a logging configuration using the exact same code as a normal
|
||||
// component. In order to redirect output, we provide a LoggingOptions
|
||||
// instance.
|
||||
var o logsapi.LoggingOptions
|
||||
c := logsapi.NewLoggingConfiguration()
|
||||
c.Format = format
|
||||
if splitStreams {
|
||||
c.Options.JSON.SplitStream = true
|
||||
if err := c.Options.JSON.InfoBufferSize.Set("64Ki"); err != nil {
|
||||
b.Fatalf("Error setting buffer size: %v", err)
|
||||
}
|
||||
}
|
||||
var files []*os.File
|
||||
if discard {
|
||||
o.ErrorStream = io.Discard
|
||||
o.InfoStream = io.Discard
|
||||
} else {
|
||||
out1, err := os.Create(filepath.Join(tmpDir, "stream-1.log"))
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
defer out1.Close()
|
||||
out2, err := os.Create(filepath.Join(tmpDir, "stream-2.log"))
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
defer out2.Close()
|
||||
|
||||
if splitStreams {
|
||||
files = append(files, out1, out2)
|
||||
o.ErrorStream = out1
|
||||
o.InfoStream = out2
|
||||
} else {
|
||||
files = append(files, out1)
|
||||
o.ErrorStream = out1
|
||||
o.InfoStream = out1
|
||||
}
|
||||
}
|
||||
|
||||
klog.SetOutput(o.ErrorStream)
|
||||
if err := logsapi.ValidateAndApplyWithOptions(c, &o, featureGate); err != nil {
|
||||
b.Fatalf("Unexpected error configuring logging: %v", err)
|
||||
}
|
||||
|
||||
generateOutput(b, config, files...)
|
||||
}
|
||||
|
||||
func generateOutput(b *testing.B, config loadGeneratorConfig, files ...*os.File) {
|
||||
msg := strings.Repeat("X", config.messageLength)
|
||||
err := errors.New("fail")
|
||||
start := time.Now()
|
||||
|
||||
// Scale by 1000 because "go test -bench" starts with b.N == 1, which is very low.
|
||||
n := b.N * 1000
|
||||
total := config.workers * n
|
||||
|
||||
b.ResetTimer()
|
||||
var wg sync.WaitGroup
|
||||
@ -245,15 +275,15 @@ func generateOutput(b *testing.B, config loadGeneratorConfig, flush func(), file
|
||||
}
|
||||
wg.Wait()
|
||||
klog.Flush()
|
||||
if flush != nil {
|
||||
flush()
|
||||
}
|
||||
b.StopTimer()
|
||||
|
||||
// Print some information about the result.
|
||||
end := time.Now()
|
||||
duration := end.Sub(start)
|
||||
total := n * config.workers
|
||||
|
||||
// Report messages/s instead of ns/op because "op" varies.
|
||||
b.ReportMetric(0, "ns/op")
|
||||
b.ReportMetric(float64(total)/duration.Seconds(), "msgs/s")
|
||||
|
||||
// Print some information about the result.
|
||||
b.Logf("Wrote %d log entries in %s -> %.1f/s", total, duration, float64(total)/duration.Seconds())
|
||||
for i, file := range files {
|
||||
if file != nil {
|
||||
|
@ -18,28 +18,20 @@ package benchmark
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"io"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
||||
logsapi "k8s.io/component-base/logs/api/v1"
|
||||
logsjson "k8s.io/component-base/logs/json"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Cause all klog output to be discarded with minimal overhead.
|
||||
// We don't include time stamps and caller information.
|
||||
// Individual tests can change that by calling flag.Set again,
|
||||
// but should always restore this state here.
|
||||
// hack/make-rules/test-integration.sh expects that all unit tests
|
||||
// support -v and -vmodule.
|
||||
klog.InitFlags(nil)
|
||||
|
||||
// Write all output into a single file.
|
||||
flag.Set("alsologtostderr", "false")
|
||||
flag.Set("logtostderr", "false")
|
||||
flag.Set("skip_headers", "true")
|
||||
flag.Set("one_output", "true")
|
||||
flag.Set("stderrthreshold", "FATAL")
|
||||
klog.SetOutput(&output)
|
||||
}
|
||||
|
||||
type bytesWritten int64
|
||||
@ -50,22 +42,6 @@ func (b *bytesWritten) Write(data []byte) (int, error) {
|
||||
return l, nil
|
||||
}
|
||||
|
||||
func (b *bytesWritten) Sync() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var output bytesWritten
|
||||
var jsonLogger = newJSONLogger(&output)
|
||||
|
||||
func newJSONLogger(out io.Writer) logr.Logger {
|
||||
encoderConfig := &zapcore.EncoderConfig{
|
||||
MessageKey: "msg",
|
||||
}
|
||||
c := logsapi.NewLoggingConfiguration()
|
||||
logger, _ := logsjson.NewJSONLogger(c.Verbosity, zapcore.AddSync(out), nil, encoderConfig)
|
||||
return logger
|
||||
}
|
||||
|
||||
func printf(item logMessage) {
|
||||
if item.isError {
|
||||
klog.Errorf("%s: %v %s", item.msg, item.err, item.kvs)
|
||||
@ -74,17 +50,14 @@ func printf(item logMessage) {
|
||||
}
|
||||
}
|
||||
|
||||
// These variables are a workaround for logcheck complaining about the dynamic
|
||||
// parameters.
|
||||
var (
|
||||
errorS = klog.ErrorS
|
||||
infoS = klog.InfoS
|
||||
)
|
||||
|
||||
func prints(item logMessage) {
|
||||
func prints(logger klog.Logger, item logMessage) {
|
||||
if item.isError {
|
||||
errorS(item.err, item.msg, item.kvs...)
|
||||
logger.Error(item.err, item.msg, item.kvs...) // nolint: logcheck
|
||||
} else {
|
||||
infoS(item.msg, item.kvs...)
|
||||
logger.Info(item.msg, item.kvs...) // nolint: logcheck
|
||||
}
|
||||
}
|
||||
|
||||
func printLogger(item logMessage) {
|
||||
prints(klog.Background(), item)
|
||||
}
|
||||
|
@ -1,2 +0,0 @@
|
||||
# This is a manually created message. See https://github.com/kubernetes/kubernetes/issues/106652 for the real one.
|
||||
Nov 19 02:13:48 kind-worker2 kubelet[250]: {"ts":1637288028968.0125,"caller":"kuberuntime/kuberuntime_manager.go:902","msg":"Creating container in pod","v":0,"container":{"Name":"terminate-cmd-rpn","Image":"registry.k8s.io/e2e-test-images/busybox:1.29-2","Command":["sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"],"TerminationMessagePath":"/dev/termination-log"}}
|
3
test/integration/logs/benchmark/data/split.log
Normal file
3
test/integration/logs/benchmark/data/split.log
Normal file
@ -0,0 +1,3 @@
|
||||
{"v": 0,
|
||||
"msg": "Pod status updated"}
|
||||
{"v": 0, "msg": "Pod status updated again"}
|
1
test/integration/logs/benchmark/data/versionresponse.log
Normal file
1
test/integration/logs/benchmark/data/versionresponse.log
Normal file
@ -0,0 +1 @@
|
||||
{"ts":1678174403964.6985,"caller":"remote/remote_runtime.go:147","msg":"[RemoteRuntimeService] Version Response","v":10,"apiVersion":{"version":"0.1.0","runtime_name":"containerd","runtime_version":"v1.6.18","runtime_api_version":"v1"}}
|
73
test/integration/logs/benchmark/get-logs.sh
Executable file
73
test/integration/logs/benchmark/get-logs.sh
Executable file
@ -0,0 +1,73 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# Copyright 2018 The Kubernetes Authors.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Usage: get-logs.sh [<job ID>]
|
||||
#
|
||||
# Downloads the latest job output or the one with the specified ID
|
||||
# and prepares running benchmarks for it.
|
||||
|
||||
set -o pipefail
|
||||
set -o errexit
|
||||
set -x
|
||||
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
latest_job () {
|
||||
gsutil cat gs://kubernetes-jenkins/logs/ci-kubernetes-kind-e2e-json-logging/latest-build.txt
|
||||
}
|
||||
|
||||
job=${1:-$(latest_job)}
|
||||
|
||||
rm -rf ci-kubernetes-kind-e2e-json-logging
|
||||
mkdir ci-kubernetes-kind-e2e-json-logging
|
||||
gsutil -m cp -R "gs://kubernetes-jenkins/logs/ci-kubernetes-kind-e2e-json-logging/${job}/*" ci-kubernetes-kind-e2e-json-logging/
|
||||
|
||||
for i in kube-apiserver kube-controller-manager kube-scheduler; do
|
||||
# Before (container runtime log dump (?)):
|
||||
# 2023-03-07T07:30:52.193301924Z stderr F {"ts":1678174252192.0676,"caller":"scheduler/schedule_one.go:81","msg":"Attempting to schedule pod","v":3,"pod":{"name":"simpletest.rc-zgd47","namespace":"gc-5422"}}
|
||||
# After:
|
||||
# {"ts":1678174252192.0676,"caller":"scheduler/schedule_one.go:81","msg":"Attempting to schedule pod","v":3,"pod":{"name":"simpletest.rc-zgd47","namespace":"gc-5422"}}
|
||||
sed -e 's/^20[^ ]* stderr . //' \
|
||||
ci-kubernetes-kind-e2e-json-logging/artifacts/kind-control-plane/containers/$i-*.log \
|
||||
> ci-kubernetes-kind-e2e-json-logging/$i.log;
|
||||
done
|
||||
|
||||
# Before (systemd format):
|
||||
# Mar 07 07:22:05 kind-control-plane kubelet[288]: {"ts":1678173725722.4487,"caller":"flag/flags.go:64","msg":"FLAG: --address=\"0.0.0.0\"\n","v":1}
|
||||
# After:
|
||||
# {"ts":1678173725722.4487,"caller":"flag/flags.go:64","msg":"FLAG: --address=\"0.0.0.0\"\n","v":1}
|
||||
grep 'kind-worker kubelet' ci-kubernetes-kind-e2e-json-logging/artifacts/kind-worker/kubelet.log | \
|
||||
sed -e 's;^.* kind-worker kubelet[^ ]*: ;;' > ci-kubernetes-kind-e2e-json-logging/kind-worker-kubelet.log
|
||||
|
||||
# Create copies of the actual files, whether they already exist or not. To
|
||||
# clean up disk space, use "git clean -fx test/integration/logs/benchmark".
|
||||
copy () {
|
||||
from="$1"
|
||||
to="$2"
|
||||
|
||||
mkdir -p "$(dirname "$to")"
|
||||
rm -f "$to"
|
||||
cp "$from" "$to"
|
||||
}
|
||||
copy ci-kubernetes-kind-e2e-json-logging/kind-worker-kubelet.log data/kind-worker-kubelet.log
|
||||
copy ci-kubernetes-kind-e2e-json-logging/kube-apiserver.log data/kube-apiserver.log
|
||||
copy ci-kubernetes-kind-e2e-json-logging/kube-controller-manager.log data/kube-controller-manager.log
|
||||
copy ci-kubernetes-kind-e2e-json-logging/kube-scheduler.log data/kube-scheduler.log
|
||||
|
||||
copy ci-kubernetes-kind-e2e-json-logging/kind-worker-kubelet.log data/v3/kind-worker-kubelet.log
|
||||
copy ci-kubernetes-kind-e2e-json-logging/kube-apiserver.log data/v3/kube-apiserver.log
|
||||
copy ci-kubernetes-kind-e2e-json-logging/kube-controller-manager.log data/v3/kube-controller-manager.log
|
||||
copy ci-kubernetes-kind-e2e-json-logging/kube-scheduler.log data/v3/kube-scheduler.log
|
@ -29,7 +29,8 @@ import (
|
||||
"strings"
|
||||
"text/template"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
runtimev1 "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
@ -52,7 +53,7 @@ const (
|
||||
)
|
||||
|
||||
type logStats struct {
|
||||
TotalLines, JsonLines, ErrorMessages int
|
||||
TotalLines, JsonLines, SplitLines, ErrorMessages int
|
||||
|
||||
ArgCounts map[string]int
|
||||
OtherLines []string
|
||||
@ -73,10 +74,11 @@ var (
|
||||
return x - y
|
||||
},
|
||||
}).Parse(`Total number of lines: {{.TotalLines}}
|
||||
JSON line continuation: {{.SplitLines}}
|
||||
Valid JSON messages: {{.JsonLines}} ({{percent .JsonLines .TotalLines}} of total lines)
|
||||
Error messages: {{.ErrorMessages}} ({{percent .ErrorMessages .JsonLines}} of valid JSON messages)
|
||||
Unrecognized lines: {{sub .TotalLines .JsonLines}}
|
||||
{{range .OtherLines}} {{.}}
|
||||
Unrecognized lines: {{sub (sub .TotalLines .JsonLines) .SplitLines}}
|
||||
{{range .OtherLines}} {{if gt (len .) 80}}{{slice . 0 80}}{{else}}{{.}}{{end}}
|
||||
{{end}}
|
||||
Args:
|
||||
total: {{if .ArgCounts.total}}{{.ArgCounts.total}}{{else}}0{{end}}{{if .ArgCounts.string}}
|
||||
@ -119,14 +121,28 @@ func loadLog(path string) (messages []logMessage, stats logStats, err error) {
|
||||
|
||||
stats.ArgCounts = map[string]int{}
|
||||
scanner := bufio.NewScanner(file)
|
||||
var buffer bytes.Buffer
|
||||
for lineNo := 0; scanner.Scan(); lineNo++ {
|
||||
stats.TotalLines++
|
||||
line := scanner.Bytes()
|
||||
msg, err := parseLine(line, &stats)
|
||||
buffer.Write(line)
|
||||
msg, err := parseLine(buffer.Bytes(), &stats)
|
||||
if err != nil {
|
||||
// JSON might have been split across multiple lines.
|
||||
var jsonErr *json.SyntaxError
|
||||
if errors.As(err, &jsonErr) && jsonErr.Offset > 1 {
|
||||
// The start of the buffer was okay. Keep the
|
||||
// data and add the next line to it.
|
||||
stats.SplitLines++
|
||||
continue
|
||||
}
|
||||
stats.OtherLines = append(stats.OtherLines, fmt.Sprintf("%d: %s", lineNo, string(line)))
|
||||
buffer.Reset()
|
||||
continue
|
||||
}
|
||||
stats.JsonLines++
|
||||
messages = append(messages, msg)
|
||||
buffer.Reset()
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
@ -136,26 +152,16 @@ func loadLog(path string) (messages []logMessage, stats logStats, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// systemd prefix:
|
||||
// Nov 19 02:08:51 kind-worker2 kubelet[250]: {"ts":1637287731687.8315,...
|
||||
//
|
||||
// kubectl (?) prefix:
|
||||
// 2021-11-19T02:08:28.475825534Z stderr F {"ts": ...
|
||||
var prefixRE = regexp.MustCompile(`^\w+ \d+ \S+ \S+ \S+: |\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z stderr . `)
|
||||
|
||||
// String format for API structs from generated.pb.go.
|
||||
// &Container{...}
|
||||
var objectRE = regexp.MustCompile(`^&([a-zA-Z]*)\{`)
|
||||
|
||||
func parseLine(line []byte, stats *logStats) (item logMessage, err error) {
|
||||
stats.TotalLines++
|
||||
line = prefixRE.ReplaceAll(line, nil)
|
||||
|
||||
content := map[string]interface{}{}
|
||||
if err := json.Unmarshal(line, &content); err != nil {
|
||||
return logMessage{}, fmt.Errorf("JSON parsing failed: %v", err)
|
||||
return logMessage{}, fmt.Errorf("JSON parsing failed: %w", err)
|
||||
}
|
||||
stats.JsonLines++
|
||||
|
||||
kvs := map[string]interface{}{}
|
||||
item.isError = true
|
||||
@ -244,6 +250,7 @@ func parseLine(line []byte, stats *logStats) (item logMessage, err error) {
|
||||
// fields are an error).
|
||||
var objectTypes = []reflect.Type{
|
||||
reflect.TypeOf(klog.ObjectRef{}),
|
||||
reflect.TypeOf(&runtimev1.VersionResponse{}),
|
||||
reflect.TypeOf(&v1.Pod{}),
|
||||
reflect.TypeOf(&v1.Container{}),
|
||||
}
|
||||
|
@ -19,24 +19,29 @@ package benchmark
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
logsapi "k8s.io/component-base/logs/api/v1"
|
||||
_ "k8s.io/component-base/logs/json/register"
|
||||
runtimev1 "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
func TestData(t *testing.T) {
|
||||
container := v1.Container{
|
||||
Command: []string{"sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"},
|
||||
Image: "registry.k8s.io/e2e-test-images/busybox:1.29-2",
|
||||
Name: "terminate-cmd-rpn",
|
||||
TerminationMessagePath: "/dev/termination-log",
|
||||
versionResponse := &runtimev1.VersionResponse{
|
||||
Version: "0.1.0",
|
||||
RuntimeName: "containerd",
|
||||
RuntimeVersion: "v1.6.18",
|
||||
RuntimeApiVersion: "v1",
|
||||
}
|
||||
|
||||
testcases := map[string]struct {
|
||||
messages []logMessage
|
||||
messages []logMessage
|
||||
// These are subsets of the full output and may be empty.
|
||||
// Prefix and variable stack traces therefore aren't compared.
|
||||
printf, structured, json string
|
||||
stats logStats
|
||||
}{
|
||||
@ -46,18 +51,31 @@ func TestData(t *testing.T) {
|
||||
msg: "Pod status updated",
|
||||
},
|
||||
},
|
||||
printf: `Pod status updated: []
|
||||
`,
|
||||
structured: `"Pod status updated"
|
||||
`,
|
||||
json: `{"msg":"Pod status updated","v":0}
|
||||
`,
|
||||
printf: `Pod status updated: []`,
|
||||
structured: `"Pod status updated"`,
|
||||
json: `"msg":"Pod status updated","v":0`,
|
||||
stats: logStats{
|
||||
TotalLines: 1,
|
||||
JsonLines: 1,
|
||||
ArgCounts: map[string]int{},
|
||||
},
|
||||
},
|
||||
"data/split.log": {
|
||||
messages: []logMessage{
|
||||
{
|
||||
msg: "Pod status updated",
|
||||
},
|
||||
{
|
||||
msg: "Pod status updated again",
|
||||
},
|
||||
},
|
||||
stats: logStats{
|
||||
TotalLines: 3,
|
||||
SplitLines: 1,
|
||||
JsonLines: 2,
|
||||
ArgCounts: map[string]int{},
|
||||
},
|
||||
},
|
||||
"data/error.log": {
|
||||
messages: []logMessage{
|
||||
{
|
||||
@ -66,12 +84,9 @@ func TestData(t *testing.T) {
|
||||
isError: true,
|
||||
},
|
||||
},
|
||||
printf: `Pod status update: failed []
|
||||
`,
|
||||
structured: `"Pod status update" err="failed"
|
||||
`,
|
||||
json: `{"msg":"Pod status update","err":"failed"}
|
||||
`,
|
||||
printf: `Pod status update: failed []`,
|
||||
structured: `"Pod status update" err="failed"`,
|
||||
json: `"msg":"Pod status update","err":"failed"`,
|
||||
stats: logStats{
|
||||
TotalLines: 1,
|
||||
JsonLines: 1,
|
||||
@ -89,12 +104,9 @@ func TestData(t *testing.T) {
|
||||
kvs: []interface{}{"err", errors.New("failed")},
|
||||
},
|
||||
},
|
||||
printf: `Pod status update: [err failed]
|
||||
`,
|
||||
structured: `"Pod status update" err="failed"
|
||||
`,
|
||||
json: `{"msg":"Pod status update","v":0,"err":"failed"}
|
||||
`,
|
||||
printf: `Pod status update: [err failed]`,
|
||||
structured: `"Pod status update" err="failed"`,
|
||||
json: `"msg":"Pod status update","v":0,"err":"failed"`,
|
||||
stats: logStats{
|
||||
TotalLines: 1,
|
||||
JsonLines: 1,
|
||||
@ -116,12 +128,9 @@ func TestData(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
printf: `Example: [pod system/kube-scheduler pv volume someString hello world someValue 1]
|
||||
`,
|
||||
structured: `"Example" pod="system/kube-scheduler" pv="volume" someString="hello world" someValue=1
|
||||
`,
|
||||
json: `{"msg":"Example","v":0,"pod":{"name":"kube-scheduler","namespace":"system"},"pv":{"name":"volume"},"someString":"hello world","someValue":1}
|
||||
`,
|
||||
printf: `Example: [pod system/kube-scheduler pv volume someString hello world someValue 1]`,
|
||||
structured: `"Example" pod="system/kube-scheduler" pv="volume" someString="hello world" someValue=1`,
|
||||
json: `"msg":"Example","v":0,"pod":{"name":"kube-scheduler","namespace":"system"},"pv":{"name":"volume"},"someString":"hello world","someValue":1`,
|
||||
stats: logStats{
|
||||
TotalLines: 1,
|
||||
JsonLines: 1,
|
||||
@ -133,100 +142,78 @@ func TestData(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
"data/container.log": {
|
||||
"data/versionresponse.log": {
|
||||
messages: []logMessage{
|
||||
{
|
||||
msg: "Creating container in pod",
|
||||
msg: "[RemoteRuntimeService] Version Response",
|
||||
verbosity: 10,
|
||||
kvs: []interface{}{
|
||||
"container", &container,
|
||||
"apiVersion", versionResponse,
|
||||
},
|
||||
},
|
||||
},
|
||||
printf: `Creating container in pod: [container &Container{Name:terminate-cmd-rpn,Image:registry.k8s.io/e2e-test-images/busybox:1.29-2,Command:[sh -c
|
||||
f=/restart-count/restartCount
|
||||
count=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})
|
||||
if [ $count -eq 1 ]; then
|
||||
exit 1
|
||||
fi
|
||||
if [ $count -eq 2 ]; then
|
||||
exit 0
|
||||
fi
|
||||
while true; do sleep 1; done
|
||||
],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},Claims:[]ResourceClaim{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,ResizePolicy:[]ContainerResizePolicy{},}]
|
||||
`,
|
||||
structured: `"Creating container in pod" container=<
|
||||
&Container{Name:terminate-cmd-rpn,Image:registry.k8s.io/e2e-test-images/busybox:1.29-2,Command:[sh -c
|
||||
f=/restart-count/restartCount
|
||||
count=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})
|
||||
if [ $count -eq 1 ]; then
|
||||
exit 1
|
||||
fi
|
||||
if [ $count -eq 2 ]; then
|
||||
exit 0
|
||||
fi
|
||||
while true; do sleep 1; done
|
||||
],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},Claims:[]ResourceClaim{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,ResizePolicy:[]ContainerResizePolicy{},}
|
||||
>
|
||||
`,
|
||||
// This is what the output would look like with JSON object. Because of https://github.com/kubernetes/kubernetes/issues/106652 we get the string instead.
|
||||
// json: `{"msg":"Creating container in pod","v":0,"container":{"name":"terminate-cmd-rpn","image":"registry.k8s.io/e2e-test-images/busybox:1.29-2","command":["sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"],"resources":{},"terminationMessagePath":"/dev/termination-log"}}
|
||||
// `,
|
||||
json: `{"msg":"Creating container in pod","v":0,"container":"&Container{Name:terminate-cmd-rpn,Image:registry.k8s.io/e2e-test-images/busybox:1.29-2,Command:[sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},Claims:[]ResourceClaim{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,ResizePolicy:[]ContainerResizePolicy{},}"}
|
||||
`,
|
||||
printf: `[RemoteRuntimeService] Version Response: [apiVersion &VersionResponse{Version:0.1.0,RuntimeName:containerd,RuntimeVersion:v1.6.18,RuntimeApiVersion:v1,}]`,
|
||||
structured: `"[RemoteRuntimeService] Version Response" apiVersion="&VersionResponse{Version:0.1.0,RuntimeName:containerd,RuntimeVersion:v1.6.18,RuntimeApiVersion:v1,}"`,
|
||||
// Because of
|
||||
// https://github.com/kubernetes/kubernetes/issues/106652
|
||||
// we get the string instead of a JSON struct.
|
||||
json: `"msg":"[RemoteRuntimeService] Version Response","v":0,"apiVersion":"&VersionResponse{Version:0.1.0,RuntimeName:containerd,RuntimeVersion:v1.6.18,RuntimeApiVersion:v1,}"`,
|
||||
stats: logStats{
|
||||
TotalLines: 2,
|
||||
TotalLines: 1,
|
||||
JsonLines: 1,
|
||||
ArgCounts: map[string]int{
|
||||
totalArg: 1,
|
||||
otherArg: 1,
|
||||
},
|
||||
OtherLines: []string{
|
||||
"0: # This is a manually created message. See https://github.com/kubernetes/kubernetes/issues/106652 for the real one.",
|
||||
},
|
||||
OtherArgs: []interface{}{
|
||||
&container,
|
||||
versionResponse,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for path, expected := range testcases {
|
||||
t.Run(path, func(t *testing.T) {
|
||||
messages, stats, err := loadLog(path)
|
||||
for filePath, expected := range testcases {
|
||||
t.Run(filePath, func(t *testing.T) {
|
||||
messages, stats, err := loadLog(filePath)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected load error: %v", err)
|
||||
}
|
||||
assert.Equal(t, expected.messages, messages)
|
||||
assert.Equal(t, expected.stats, stats)
|
||||
print := func(format func(item logMessage)) {
|
||||
printAll := func(format func(item logMessage)) {
|
||||
for _, item := range expected.messages {
|
||||
format(item)
|
||||
}
|
||||
}
|
||||
testBuffered := func(t *testing.T, expected string, format func(item logMessage)) {
|
||||
testBuffered := func(t *testing.T, expected string, format string, print func(item logMessage)) {
|
||||
var buffer bytes.Buffer
|
||||
c := logsapi.NewLoggingConfiguration()
|
||||
c.Format = format
|
||||
o := logsapi.LoggingOptions{
|
||||
ErrorStream: &buffer,
|
||||
InfoStream: &buffer,
|
||||
}
|
||||
klog.SetOutput(&buffer)
|
||||
defer klog.SetOutput(&output)
|
||||
if err := logsapi.ValidateAndApplyWithOptions(c, &o, nil); err != nil {
|
||||
t.Fatalf("Unexpected error configuring logging: %v", err)
|
||||
}
|
||||
|
||||
print(format)
|
||||
printAll(print)
|
||||
klog.Flush()
|
||||
assert.Equal(t, expected, buffer.String())
|
||||
|
||||
if !strings.Contains(buffer.String(), expected) {
|
||||
t.Errorf("Expected log output to contain:\n%s\nActual log output:\n%s\n", expected, buffer.String())
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("printf", func(t *testing.T) {
|
||||
testBuffered(t, expected.printf, printf)
|
||||
testBuffered(t, expected.printf, "text", printf)
|
||||
})
|
||||
t.Run("structured", func(t *testing.T) {
|
||||
testBuffered(t, expected.structured, prints)
|
||||
testBuffered(t, expected.structured, "text", printLogger)
|
||||
})
|
||||
t.Run("json", func(t *testing.T) {
|
||||
var buffer bytes.Buffer
|
||||
logger := newJSONLogger(&buffer)
|
||||
klog.SetLogger(logger)
|
||||
defer klog.ClearLogger()
|
||||
print(prints)
|
||||
klog.Flush()
|
||||
assert.Equal(t, expected.json, buffer.String())
|
||||
testBuffered(t, expected.json, "json", printLogger)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user