From c008732948907db3417f99fd8726a0c6c645fd93 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 31 Jan 2023 10:22:13 +0100 Subject: [PATCH 1/4] test/integration: add StartEtcd In contrast to EtcdMain, it can be called by individual tests or benchmarks and each caller will get a fresh etcd instance. However, it uses the same underlying code and the same port for all instances, so tests cannot run in parallel. --- test/integration/apiserver/watchcache_test.go | 4 +- test/integration/framework/etcd.go | 55 +++++++++++-------- test/integration/framework/goleak.go | 34 ++++++++++++ 3 files changed, 69 insertions(+), 24 deletions(-) diff --git a/test/integration/apiserver/watchcache_test.go b/test/integration/apiserver/watchcache_test.go index b5abfb368eb..ae0b1aa6ec5 100644 --- a/test/integration/apiserver/watchcache_test.go +++ b/test/integration/apiserver/watchcache_test.go @@ -37,12 +37,12 @@ import ( // with one of them containing events and the other all other objects. func multiEtcdSetup(t *testing.T) (clientset.Interface, framework.TearDownFunc) { etcdArgs := []string{"--experimental-watch-progress-notify-interval", "1s"} - etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs) + etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs, nil) if err != nil { t.Fatalf("Couldn't start etcd: %v", err) } - etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs) + etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs, nil) if err != nil { t.Fatalf("Couldn't start etcd: %v", err) } diff --git a/test/integration/framework/etcd.go b/test/integration/framework/etcd.go index e0fcff1007c..a7a502da4ca 100644 --- a/test/integration/framework/etcd.go +++ b/test/integration/framework/etcd.go @@ -26,6 +26,7 @@ import ( "os/exec" "strings" "syscall" + "testing" "time" "go.uber.org/goleak" @@ -62,7 +63,7 @@ func getAvailablePort() (int, error) { // startEtcd executes an etcd instance. The returned function will signal the // etcd process and wait for it to exit. -func startEtcd() (func(), error) { +func startEtcd(output io.Writer) (func(), error) { etcdURL := env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379") conn, err := net.Dial("tcp", strings.TrimPrefix(etcdURL, "http://")) if err == nil { @@ -72,7 +73,7 @@ func startEtcd() (func(), error) { } klog.V(1).Infof("could not connect to etcd: %v", err) - currentURL, stop, err := RunCustomEtcd("integration_test_etcd_data", nil) + currentURL, stop, err := RunCustomEtcd("integration_test_etcd_data", nil, output) if err != nil { return nil, err } @@ -83,7 +84,7 @@ func startEtcd() (func(), error) { } // RunCustomEtcd starts a custom etcd instance for test purposes. -func RunCustomEtcd(dataDir string, customFlags []string) (url string, stopFn func(), err error) { +func RunCustomEtcd(dataDir string, customFlags []string, output io.Writer) (url string, stopFn func(), err error) { // TODO: Check for valid etcd version. etcdPath, err := getEtcdPath() if err != nil { @@ -119,8 +120,13 @@ func RunCustomEtcd(dataDir string, customFlags []string) (url string, stopFn fun } args = append(args, customFlags...) cmd := exec.CommandContext(ctx, etcdPath, args...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr + if output == nil { + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + } else { + cmd.Stdout = output + cmd.Stderr = output + } stop := func() { // try to exit etcd gracefully defer cancel() @@ -194,7 +200,7 @@ func EtcdMain(tests func() int) { goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), ) - stop, err := startEtcd() + stop, err := startEtcd(nil) if err != nil { klog.Fatalf("cannot run integration tests: unable to start etcd: %v", err) } @@ -202,27 +208,32 @@ func EtcdMain(tests func() int) { stop() // Don't defer this. See os.Exit documentation. klog.StopFlushDaemon() - // Several tests don't wait for goroutines to stop. goleak.Find retries - // internally, but not long enough. 5 seconds seemed to be enough for - // most tests, even when testing in the CI. - timeout := 5 * time.Second - start := time.Now() - for { - err := goleak.Find(goleakOpts...) - if err == nil { - break - } - if time.Now().Sub(start) >= timeout { - klog.ErrorS(err, "EtcdMain goroutine check") - result = 1 - break - } + if err := goleakFindRetry(goleakOpts...); err != nil { + klog.ErrorS(err, "EtcdMain goroutine check") + result = 1 } os.Exit(result) } -// GetEtcdURL returns the URL of the etcd instance started by EtcdMain. +// GetEtcdURL returns the URL of the etcd instance started by EtcdMain or StartEtcd. func GetEtcdURL() string { return env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379") } + +// StartEtcd starts an etcd instance inside a test. It will abort the test if +// startup fails and clean up after the test automatically. Stdout and stderr +// of the etcd binary go to the provided writer. +// +// In contrast to EtcdMain, StartEtcd will not do automatic leak checking. +// Tests can decide if and where they want to do that. +// +// Starting etcd multiple times per test run instead of once with EtcdMain +// provides better separation between different tests. +func StartEtcd(tb testing.TB, etcdOutput io.Writer) { + stop, err := startEtcd(etcdOutput) + if err != nil { + tb.Fatalf("unable to start etcd: %v", err) + } + tb.Cleanup(stop) +} diff --git a/test/integration/framework/goleak.go b/test/integration/framework/goleak.go index 5158fff0a37..0790cfd33ab 100644 --- a/test/integration/framework/goleak.go +++ b/test/integration/framework/goleak.go @@ -17,6 +17,9 @@ limitations under the License. package framework import ( + "testing" + "time" + "go.uber.org/goleak" "k8s.io/apiserver/pkg/server/healthz" ) @@ -34,3 +37,34 @@ func IgnoreBackgroundGoroutines() []goleak.Option { return []goleak.Option{goleak.IgnoreCurrent()} } + +// GoleakCheck sets up leak checking for a test or benchmark. +// The check runs as cleanup operation and records an +// error when goroutines were leaked. +func GoleakCheck(tb testing.TB, opts ...goleak.Option) { + // Must be called *before* creating new goroutines. + opts = append(opts, IgnoreBackgroundGoroutines()...) + + tb.Cleanup(func() { + if err := goleakFindRetry(opts...); err != nil { + tb.Error(err.Error()) + } + }) +} + +func goleakFindRetry(opts ...goleak.Option) error { + // Several tests don't wait for goroutines to stop. goleak.Find retries + // internally, but not long enough. 5 seconds seemed to be enough for + // most tests, even when testing in the CI. + timeout := 5 * time.Second + start := time.Now() + for { + err := goleak.Find(opts...) + if err == nil { + return nil + } + if time.Now().Sub(start) >= timeout { + return err + } + } +} From 00d14595307f47e6e36984c9794b122e5f6b986d Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 2 Feb 2023 14:04:19 +0100 Subject: [PATCH 2/4] test/utils: extend ktesting The upstream ktesting has to be very flexible to accommodate different ways of using it. In Kubernetes, we can be opinionated and make certain choices, like using klog flags, and only those. --- test/utils/ktesting/ktesting.go | 77 +++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 test/utils/ktesting/ktesting.go diff --git a/test/utils/ktesting/ktesting.go b/test/utils/ktesting/ktesting.go new file mode 100644 index 00000000000..15508816265 --- /dev/null +++ b/test/utils/ktesting/ktesting.go @@ -0,0 +1,77 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package ktesting is a wrapper around k8s.io/klog/v2/ktesting. It provides +// those (and only those) functions that test code in Kubernetes should use, +// plus better dumping of complex datatypes. It adds the klog command line +// flags and increases the default verbosity to 5. +package ktesting + +import ( + "context" + "flag" + "fmt" + + _ "k8s.io/component-base/logs/testinit" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" + // "k8s.io/kubernetes/test/utils/format" +) + +func init() { + // This is a good default for unit tests. Benchmarks should add their own + // init function or TestMain to lower the default, for example to 2. + SetDefaultVerbosity(5) +} + +// SetDefaultVerbosity can be called during init to modify the default +// log verbosity of the program. +func SetDefaultVerbosity(v int) { + f := flag.CommandLine.Lookup("v") + _ = f.Value.Set(fmt.Sprintf("%d", v)) +} + +// NewTestContext is a wrapper around ktesting.NewTestContext with settings +// specific to Kubernetes. +func NewTestContext(tl ktesting.TL) (klog.Logger, context.Context) { + config := ktesting.NewConfig( + // TODO (pohly): merge + // https://github.com/kubernetes/klog/pull/363, new klog + // release, update and merge + // https://github.com/kubernetes/kubernetes/pull/115277, then + // uncomment this. + // + // ktesting.AnyToString(format.AnyToString), + ktesting.VerbosityFlagName("v"), + ktesting.VModuleFlagName("vmodule"), + ) + + // Copy klog settings instead of making the ktesting logger + // configurable directly. + var fs flag.FlagSet + config.AddFlags(&fs) + for _, name := range []string{"v", "vmodule"} { + from := flag.CommandLine.Lookup(name) + to := fs.Lookup(name) + if err := to.Value.Set(from.Value.String()); err != nil { + panic(err) + } + } + + logger := ktesting.NewLogger(tl, config) + ctx := klog.NewContext(context.Background(), logger) + return logger, ctx +} From 961129c5f19868199d326b1830e87f94f96acd92 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 27 Feb 2023 20:36:02 +0100 Subject: [PATCH 3/4] scheduler_perf: add logging flags This enables testing of different real production configurations (JSON vs. text, different log levels, contextual logging). --- test/integration/framework/logger.go | 89 +++++++++++++++++++ test/integration/scheduler_perf/README.md | 12 +++ test/integration/scheduler_perf/main_test.go | 48 ++++++++-- .../scheduler_perf/scheduler_perf_test.go | 72 ++++++++++++++- 4 files changed, 215 insertions(+), 6 deletions(-) create mode 100644 test/integration/framework/logger.go diff --git a/test/integration/framework/logger.go b/test/integration/framework/logger.go new file mode 100644 index 00000000000..68aca0d2bd6 --- /dev/null +++ b/test/integration/framework/logger.go @@ -0,0 +1,89 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "flag" + "io" + "testing" + + "k8s.io/klog/v2" +) + +// RedirectKlog modifies the global klog logger so that it writes via the given +// writer. This only works when different tests run sequentially. +// +// The returned cleanup function restores the previous state. Beware that it is +// not thread-safe, all goroutines which call klog must have been stopped. +func RedirectKlog(tb testing.TB, output io.Writer) func() { + expectNoError := func(err error) { + if err != nil { + tb.Fatalf("unexpected error: %v", err) + } + } + + state := klog.CaptureState() + defer func() { + if r := recover(); r != nil { + state.Restore() + panic(r) + } + }() + var fs flag.FlagSet + klog.InitFlags(&fs) + expectNoError(fs.Set("log_file", "/dev/null")) + expectNoError(fs.Set("logtostderr", "false")) + expectNoError(fs.Set("alsologtostderr", "false")) + expectNoError(fs.Set("stderrthreshold", "10")) + expectNoError(fs.Set("one_output", "true")) + klog.SetOutput(output) + return state.Restore +} + +// NewTBWriter creates an io.Writer which turns each write into a tb.Log call. +// +// Note that no attempts are made to determine the actual call site because +// our caller doesn't know about the TB instance and thus cannot mark itself +// as helper. Therefore the code here doesn't do it either and thus shows up +// as call site in the testing output. To avoid that, contextual logging +// and ktesting have to be used. +func NewTBWriter(tb testing.TB) io.Writer { + return testingWriter{TB: tb} +} + +type testingWriter struct { + testing.TB +} + +func (tw testingWriter) Write(data []byte) (int, error) { + logLen := len(data) + if logLen == 0 { + return 0, nil + } + // Trim trailing line break? Log will add it. + if data[logLen-1] == '\n' { + logLen-- + } + // We could call TB.Helper here, but that doesn't really help because + // then our caller (code in klog) will be reported instead, which isn't + // right either. klog would have to call TB.Helper itself, but doesn't + // know about the TB instance. + tw.Log(string(data[:logLen])) + return len(data), nil +} + +var _ io.Writer = testingWriter{} diff --git a/test/integration/scheduler_perf/README.md b/test/integration/scheduler_perf/README.md index a9b38a8450f..a10eec0bf49 100644 --- a/test/integration/scheduler_perf/README.md +++ b/test/integration/scheduler_perf/README.md @@ -77,3 +77,15 @@ The configuration file under `config/performance-config.yaml` contains a default various scenarios. In case you want to add your own, you can extend the list with new templates. It's also possible to extend `op` data type, respectively its underlying data types to extend configuration of possible test cases. + +### Logging + +The default verbosity is 2 (the recommended value for production). -v can be +used to change this. The log format can be changed with +-logging-format=text|json. The default is to write into a log file (when using +the text format) or stderr (when using JSON). Together these options allow +simulating different real production configurations and to compare their +performance. + +During interactive debugging sessions it is possible to enable per-test output +via -use-testing-log. diff --git a/test/integration/scheduler_perf/main_test.go b/test/integration/scheduler_perf/main_test.go index 09f3a87136f..2019a255293 100644 --- a/test/integration/scheduler_perf/main_test.go +++ b/test/integration/scheduler_perf/main_test.go @@ -18,17 +18,55 @@ package benchmark import ( "flag" + "fmt" + "os" + "strings" "testing" - "k8s.io/klog/v2/ktesting" - "k8s.io/kubernetes/test/integration/framework" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/component-base/featuregate" + "k8s.io/component-base/logs" + logsapi "k8s.io/component-base/logs/api/v1" + _ "k8s.io/component-base/logs/json/register" + "k8s.io/kubernetes/test/utils/ktesting" ) func TestMain(m *testing.M) { // Run with -v=2, this is the default log level in production. - ktesting.DefaultConfig = ktesting.NewConfig(ktesting.Verbosity(2)) - ktesting.DefaultConfig.AddFlags(flag.CommandLine) + ktesting.SetDefaultVerbosity(2) + + // test/integration/framework/flags.go unconditionally initializes the + // logging flags. That's correct for most tests, but in the + // scheduler_perf test we want more control over the flags, therefore + // here strip them out. + var fs flag.FlagSet + flag.CommandLine.VisitAll(func(f *flag.Flag) { + switch f.Name { + case "log-flush-frequency", "v", "vmodule": + // These will be added below ourselves, don't copy. + default: + fs.Var(f.Value, f.Name, f.Usage) + } + }) + flag.CommandLine = &fs + + featureGate := featuregate.NewFeatureGate() + runtime.Must(logsapi.AddFeatureGates(featureGate)) + flag.Var(featureGate, "feature-gate", + "A set of key=value pairs that describe feature gates for alpha/experimental features. "+ + "Options are:\n"+strings.Join(featureGate.KnownFeatures(), "\n")) + c := logsapi.NewLoggingConfiguration() + + // This would fail if we hadn't removed the logging flags above. + logsapi.AddGoFlags(c, flag.CommandLine) + flag.Parse() - framework.EtcdMain(m.Run) + logs.InitLogs() + if err := logsapi.ValidateAndApply(c, featureGate); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } + + m.Run() } diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index 2eb1b29c76f..18f018a46bf 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -19,9 +19,13 @@ package benchmark import ( "context" "encoding/json" + "flag" "fmt" + "io" + "io/ioutil" "math" "os" + "path" "strings" "sync" "testing" @@ -48,6 +52,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/test/integration/framework" testutils "k8s.io/kubernetes/test/utils" + "k8s.io/kubernetes/test/utils/ktesting" "sigs.k8s.io/yaml" ) @@ -570,6 +575,39 @@ func (so sleepOp) patchParams(_ *workload) (realOp, error) { return &so, nil } +var useTestingLog = flag.Bool("use-testing-log", false, "Write log entries with testing.TB.Log. This is more suitable for unit testing and debugging, but less realistic in real benchmarks.") + +func initTestOutput(tb testing.TB) io.Writer { + var output io.Writer + if *useTestingLog { + output = framework.NewTBWriter(tb) + } else { + tmpDir := tb.TempDir() + logfileName := path.Join(tmpDir, "output.log") + fileOutput, err := os.Create(logfileName) + if err != nil { + tb.Fatalf("create log file: %v", err) + } + output = fileOutput + + tb.Cleanup(func() { + // Dump the log output when the test is done. The user + // can decide how much of it will be visible in case of + // success: then "go test" truncates, "go test -v" + // doesn't. All of it will be shown for a failure. + if err := fileOutput.Close(); err != nil { + tb.Fatalf("close log file: %v", err) + } + log, err := ioutil.ReadFile(logfileName) + if err != nil { + tb.Fatalf("read log file: %v", err) + } + tb.Logf("full log output:\n%s", string(log)) + }) + } + return output +} + func BenchmarkPerfScheduling(b *testing.B) { testCases, err := getTestCases(configFile) if err != nil { @@ -579,13 +617,45 @@ func BenchmarkPerfScheduling(b *testing.B) { b.Fatal(err) } + output := initTestOutput(b) + + // Because we run sequentially, it is possible to change the global + // klog logger and redirect log output. Quite a lot of code still uses + // it instead of supporting contextual logging. + // + // Because we leak one goroutine which calls klog, we cannot restore + // the previous state. + _ = framework.RedirectKlog(b, output) + dataItems := DataItems{Version: "v1"} for _, tc := range testCases { b.Run(tc.Name, func(b *testing.B) { for _, w := range tc.Workloads { b.Run(w.Name, func(b *testing.B) { + // Ensure that there are no leaked + // goroutines. They could influence + // performance of the next benchmark. + // This must *after* RedirectKlog + // because then during cleanup, the + // test will wait for goroutines to + // quit *before* restoring klog settings. + framework.GoleakCheck(b) + + ctx := context.Background() + + if *useTestingLog { + // In addition to redirection klog + // output, also enable contextual + // logging. + _, ctx = ktesting.NewTestContext(b) + } + + // Now that we are ready to run, start + // etcd. + framework.StartEtcd(b, output) + // 30 minutes should be plenty enough even for the 5000-node tests. - ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Minute) + ctx, cancel := context.WithTimeout(ctx, 30*time.Minute) b.Cleanup(cancel) for feature, flag := range tc.FeatureGates { From cc4bcd1d8e98d206cf0d3d00ae2847ddaa098992 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 27 Jan 2023 14:17:23 +0100 Subject: [PATCH 4/4] scheduler_perf: report data items as benchmark results This replaces the pretty useless us/op metric (useless because it includes setup and teardown times) with the same values that also get stored in the JSON file. The main advantage is that benchstat can be used to analyze and compare results. --- .../scheduler_perf/scheduler_perf_test.go | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index 18f018a46bf..9177fc6bb89 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -661,7 +661,32 @@ func BenchmarkPerfScheduling(b *testing.B) { for feature, flag := range tc.FeatureGates { defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)() } - dataItems.DataItems = append(dataItems.DataItems, runWorkload(ctx, b, tc, w)...) + results := runWorkload(ctx, b, tc, w) + dataItems.DataItems = append(dataItems.DataItems, results...) + + if len(results) > 0 { + // The default ns/op is not + // useful because it includes + // the time spent on + // initialization and shutdown. Here we suppress it. + b.ReportMetric(0, "ns/op") + + // Instead, report the same + // results that also get stored + // in the JSON file. + for _, result := range results { + // For some metrics like + // scheduler_framework_extension_point_duration_seconds + // the actual value has some + // other unit. We patch the key + // to make it look right. + metric := strings.ReplaceAll(result.Labels["Metric"], "_seconds", "_"+result.Unit) + for key, value := range result.Data { + b.ReportMetric(value, metric+"/"+key) + } + } + } + // Reset metrics to prevent metrics generated in current workload gets // carried over to the next workload. legacyregistry.Reset()