diff --git a/test/integration/apiserver/watchcache_test.go b/test/integration/apiserver/watchcache_test.go index b5abfb368eb..ae0b1aa6ec5 100644 --- a/test/integration/apiserver/watchcache_test.go +++ b/test/integration/apiserver/watchcache_test.go @@ -37,12 +37,12 @@ import ( // with one of them containing events and the other all other objects. func multiEtcdSetup(t *testing.T) (clientset.Interface, framework.TearDownFunc) { etcdArgs := []string{"--experimental-watch-progress-notify-interval", "1s"} - etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs) + etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs, nil) if err != nil { t.Fatalf("Couldn't start etcd: %v", err) } - etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs) + etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs, nil) if err != nil { t.Fatalf("Couldn't start etcd: %v", err) } diff --git a/test/integration/framework/etcd.go b/test/integration/framework/etcd.go index e0fcff1007c..a7a502da4ca 100644 --- a/test/integration/framework/etcd.go +++ b/test/integration/framework/etcd.go @@ -26,6 +26,7 @@ import ( "os/exec" "strings" "syscall" + "testing" "time" "go.uber.org/goleak" @@ -62,7 +63,7 @@ func getAvailablePort() (int, error) { // startEtcd executes an etcd instance. The returned function will signal the // etcd process and wait for it to exit. -func startEtcd() (func(), error) { +func startEtcd(output io.Writer) (func(), error) { etcdURL := env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379") conn, err := net.Dial("tcp", strings.TrimPrefix(etcdURL, "http://")) if err == nil { @@ -72,7 +73,7 @@ func startEtcd() (func(), error) { } klog.V(1).Infof("could not connect to etcd: %v", err) - currentURL, stop, err := RunCustomEtcd("integration_test_etcd_data", nil) + currentURL, stop, err := RunCustomEtcd("integration_test_etcd_data", nil, output) if err != nil { return nil, err } @@ -83,7 +84,7 @@ func startEtcd() (func(), error) { } // RunCustomEtcd starts a custom etcd instance for test purposes. -func RunCustomEtcd(dataDir string, customFlags []string) (url string, stopFn func(), err error) { +func RunCustomEtcd(dataDir string, customFlags []string, output io.Writer) (url string, stopFn func(), err error) { // TODO: Check for valid etcd version. etcdPath, err := getEtcdPath() if err != nil { @@ -119,8 +120,13 @@ func RunCustomEtcd(dataDir string, customFlags []string) (url string, stopFn fun } args = append(args, customFlags...) cmd := exec.CommandContext(ctx, etcdPath, args...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr + if output == nil { + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + } else { + cmd.Stdout = output + cmd.Stderr = output + } stop := func() { // try to exit etcd gracefully defer cancel() @@ -194,7 +200,7 @@ func EtcdMain(tests func() int) { goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), ) - stop, err := startEtcd() + stop, err := startEtcd(nil) if err != nil { klog.Fatalf("cannot run integration tests: unable to start etcd: %v", err) } @@ -202,27 +208,32 @@ func EtcdMain(tests func() int) { stop() // Don't defer this. See os.Exit documentation. klog.StopFlushDaemon() - // Several tests don't wait for goroutines to stop. goleak.Find retries - // internally, but not long enough. 5 seconds seemed to be enough for - // most tests, even when testing in the CI. - timeout := 5 * time.Second - start := time.Now() - for { - err := goleak.Find(goleakOpts...) - if err == nil { - break - } - if time.Now().Sub(start) >= timeout { - klog.ErrorS(err, "EtcdMain goroutine check") - result = 1 - break - } + if err := goleakFindRetry(goleakOpts...); err != nil { + klog.ErrorS(err, "EtcdMain goroutine check") + result = 1 } os.Exit(result) } -// GetEtcdURL returns the URL of the etcd instance started by EtcdMain. +// GetEtcdURL returns the URL of the etcd instance started by EtcdMain or StartEtcd. func GetEtcdURL() string { return env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379") } + +// StartEtcd starts an etcd instance inside a test. It will abort the test if +// startup fails and clean up after the test automatically. Stdout and stderr +// of the etcd binary go to the provided writer. +// +// In contrast to EtcdMain, StartEtcd will not do automatic leak checking. +// Tests can decide if and where they want to do that. +// +// Starting etcd multiple times per test run instead of once with EtcdMain +// provides better separation between different tests. +func StartEtcd(tb testing.TB, etcdOutput io.Writer) { + stop, err := startEtcd(etcdOutput) + if err != nil { + tb.Fatalf("unable to start etcd: %v", err) + } + tb.Cleanup(stop) +} diff --git a/test/integration/framework/goleak.go b/test/integration/framework/goleak.go index 5158fff0a37..0790cfd33ab 100644 --- a/test/integration/framework/goleak.go +++ b/test/integration/framework/goleak.go @@ -17,6 +17,9 @@ limitations under the License. package framework import ( + "testing" + "time" + "go.uber.org/goleak" "k8s.io/apiserver/pkg/server/healthz" ) @@ -34,3 +37,34 @@ func IgnoreBackgroundGoroutines() []goleak.Option { return []goleak.Option{goleak.IgnoreCurrent()} } + +// GoleakCheck sets up leak checking for a test or benchmark. +// The check runs as cleanup operation and records an +// error when goroutines were leaked. +func GoleakCheck(tb testing.TB, opts ...goleak.Option) { + // Must be called *before* creating new goroutines. + opts = append(opts, IgnoreBackgroundGoroutines()...) + + tb.Cleanup(func() { + if err := goleakFindRetry(opts...); err != nil { + tb.Error(err.Error()) + } + }) +} + +func goleakFindRetry(opts ...goleak.Option) error { + // Several tests don't wait for goroutines to stop. goleak.Find retries + // internally, but not long enough. 5 seconds seemed to be enough for + // most tests, even when testing in the CI. + timeout := 5 * time.Second + start := time.Now() + for { + err := goleak.Find(opts...) + if err == nil { + return nil + } + if time.Now().Sub(start) >= timeout { + return err + } + } +} diff --git a/test/integration/framework/logger.go b/test/integration/framework/logger.go new file mode 100644 index 00000000000..68aca0d2bd6 --- /dev/null +++ b/test/integration/framework/logger.go @@ -0,0 +1,89 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "flag" + "io" + "testing" + + "k8s.io/klog/v2" +) + +// RedirectKlog modifies the global klog logger so that it writes via the given +// writer. This only works when different tests run sequentially. +// +// The returned cleanup function restores the previous state. Beware that it is +// not thread-safe, all goroutines which call klog must have been stopped. +func RedirectKlog(tb testing.TB, output io.Writer) func() { + expectNoError := func(err error) { + if err != nil { + tb.Fatalf("unexpected error: %v", err) + } + } + + state := klog.CaptureState() + defer func() { + if r := recover(); r != nil { + state.Restore() + panic(r) + } + }() + var fs flag.FlagSet + klog.InitFlags(&fs) + expectNoError(fs.Set("log_file", "/dev/null")) + expectNoError(fs.Set("logtostderr", "false")) + expectNoError(fs.Set("alsologtostderr", "false")) + expectNoError(fs.Set("stderrthreshold", "10")) + expectNoError(fs.Set("one_output", "true")) + klog.SetOutput(output) + return state.Restore +} + +// NewTBWriter creates an io.Writer which turns each write into a tb.Log call. +// +// Note that no attempts are made to determine the actual call site because +// our caller doesn't know about the TB instance and thus cannot mark itself +// as helper. Therefore the code here doesn't do it either and thus shows up +// as call site in the testing output. To avoid that, contextual logging +// and ktesting have to be used. +func NewTBWriter(tb testing.TB) io.Writer { + return testingWriter{TB: tb} +} + +type testingWriter struct { + testing.TB +} + +func (tw testingWriter) Write(data []byte) (int, error) { + logLen := len(data) + if logLen == 0 { + return 0, nil + } + // Trim trailing line break? Log will add it. + if data[logLen-1] == '\n' { + logLen-- + } + // We could call TB.Helper here, but that doesn't really help because + // then our caller (code in klog) will be reported instead, which isn't + // right either. klog would have to call TB.Helper itself, but doesn't + // know about the TB instance. + tw.Log(string(data[:logLen])) + return len(data), nil +} + +var _ io.Writer = testingWriter{} diff --git a/test/integration/scheduler_perf/README.md b/test/integration/scheduler_perf/README.md index a9b38a8450f..a10eec0bf49 100644 --- a/test/integration/scheduler_perf/README.md +++ b/test/integration/scheduler_perf/README.md @@ -77,3 +77,15 @@ The configuration file under `config/performance-config.yaml` contains a default various scenarios. In case you want to add your own, you can extend the list with new templates. It's also possible to extend `op` data type, respectively its underlying data types to extend configuration of possible test cases. + +### Logging + +The default verbosity is 2 (the recommended value for production). -v can be +used to change this. The log format can be changed with +-logging-format=text|json. The default is to write into a log file (when using +the text format) or stderr (when using JSON). Together these options allow +simulating different real production configurations and to compare their +performance. + +During interactive debugging sessions it is possible to enable per-test output +via -use-testing-log. diff --git a/test/integration/scheduler_perf/main_test.go b/test/integration/scheduler_perf/main_test.go index 09f3a87136f..2019a255293 100644 --- a/test/integration/scheduler_perf/main_test.go +++ b/test/integration/scheduler_perf/main_test.go @@ -18,17 +18,55 @@ package benchmark import ( "flag" + "fmt" + "os" + "strings" "testing" - "k8s.io/klog/v2/ktesting" - "k8s.io/kubernetes/test/integration/framework" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/component-base/featuregate" + "k8s.io/component-base/logs" + logsapi "k8s.io/component-base/logs/api/v1" + _ "k8s.io/component-base/logs/json/register" + "k8s.io/kubernetes/test/utils/ktesting" ) func TestMain(m *testing.M) { // Run with -v=2, this is the default log level in production. - ktesting.DefaultConfig = ktesting.NewConfig(ktesting.Verbosity(2)) - ktesting.DefaultConfig.AddFlags(flag.CommandLine) + ktesting.SetDefaultVerbosity(2) + + // test/integration/framework/flags.go unconditionally initializes the + // logging flags. That's correct for most tests, but in the + // scheduler_perf test we want more control over the flags, therefore + // here strip them out. + var fs flag.FlagSet + flag.CommandLine.VisitAll(func(f *flag.Flag) { + switch f.Name { + case "log-flush-frequency", "v", "vmodule": + // These will be added below ourselves, don't copy. + default: + fs.Var(f.Value, f.Name, f.Usage) + } + }) + flag.CommandLine = &fs + + featureGate := featuregate.NewFeatureGate() + runtime.Must(logsapi.AddFeatureGates(featureGate)) + flag.Var(featureGate, "feature-gate", + "A set of key=value pairs that describe feature gates for alpha/experimental features. "+ + "Options are:\n"+strings.Join(featureGate.KnownFeatures(), "\n")) + c := logsapi.NewLoggingConfiguration() + + // This would fail if we hadn't removed the logging flags above. + logsapi.AddGoFlags(c, flag.CommandLine) + flag.Parse() - framework.EtcdMain(m.Run) + logs.InitLogs() + if err := logsapi.ValidateAndApply(c, featureGate); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } + + m.Run() } diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index 2eb1b29c76f..9177fc6bb89 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -19,9 +19,13 @@ package benchmark import ( "context" "encoding/json" + "flag" "fmt" + "io" + "io/ioutil" "math" "os" + "path" "strings" "sync" "testing" @@ -48,6 +52,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/test/integration/framework" testutils "k8s.io/kubernetes/test/utils" + "k8s.io/kubernetes/test/utils/ktesting" "sigs.k8s.io/yaml" ) @@ -570,6 +575,39 @@ func (so sleepOp) patchParams(_ *workload) (realOp, error) { return &so, nil } +var useTestingLog = flag.Bool("use-testing-log", false, "Write log entries with testing.TB.Log. This is more suitable for unit testing and debugging, but less realistic in real benchmarks.") + +func initTestOutput(tb testing.TB) io.Writer { + var output io.Writer + if *useTestingLog { + output = framework.NewTBWriter(tb) + } else { + tmpDir := tb.TempDir() + logfileName := path.Join(tmpDir, "output.log") + fileOutput, err := os.Create(logfileName) + if err != nil { + tb.Fatalf("create log file: %v", err) + } + output = fileOutput + + tb.Cleanup(func() { + // Dump the log output when the test is done. The user + // can decide how much of it will be visible in case of + // success: then "go test" truncates, "go test -v" + // doesn't. All of it will be shown for a failure. + if err := fileOutput.Close(); err != nil { + tb.Fatalf("close log file: %v", err) + } + log, err := ioutil.ReadFile(logfileName) + if err != nil { + tb.Fatalf("read log file: %v", err) + } + tb.Logf("full log output:\n%s", string(log)) + }) + } + return output +} + func BenchmarkPerfScheduling(b *testing.B) { testCases, err := getTestCases(configFile) if err != nil { @@ -579,19 +617,76 @@ func BenchmarkPerfScheduling(b *testing.B) { b.Fatal(err) } + output := initTestOutput(b) + + // Because we run sequentially, it is possible to change the global + // klog logger and redirect log output. Quite a lot of code still uses + // it instead of supporting contextual logging. + // + // Because we leak one goroutine which calls klog, we cannot restore + // the previous state. + _ = framework.RedirectKlog(b, output) + dataItems := DataItems{Version: "v1"} for _, tc := range testCases { b.Run(tc.Name, func(b *testing.B) { for _, w := range tc.Workloads { b.Run(w.Name, func(b *testing.B) { + // Ensure that there are no leaked + // goroutines. They could influence + // performance of the next benchmark. + // This must *after* RedirectKlog + // because then during cleanup, the + // test will wait for goroutines to + // quit *before* restoring klog settings. + framework.GoleakCheck(b) + + ctx := context.Background() + + if *useTestingLog { + // In addition to redirection klog + // output, also enable contextual + // logging. + _, ctx = ktesting.NewTestContext(b) + } + + // Now that we are ready to run, start + // etcd. + framework.StartEtcd(b, output) + // 30 minutes should be plenty enough even for the 5000-node tests. - ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Minute) + ctx, cancel := context.WithTimeout(ctx, 30*time.Minute) b.Cleanup(cancel) for feature, flag := range tc.FeatureGates { defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)() } - dataItems.DataItems = append(dataItems.DataItems, runWorkload(ctx, b, tc, w)...) + results := runWorkload(ctx, b, tc, w) + dataItems.DataItems = append(dataItems.DataItems, results...) + + if len(results) > 0 { + // The default ns/op is not + // useful because it includes + // the time spent on + // initialization and shutdown. Here we suppress it. + b.ReportMetric(0, "ns/op") + + // Instead, report the same + // results that also get stored + // in the JSON file. + for _, result := range results { + // For some metrics like + // scheduler_framework_extension_point_duration_seconds + // the actual value has some + // other unit. We patch the key + // to make it look right. + metric := strings.ReplaceAll(result.Labels["Metric"], "_seconds", "_"+result.Unit) + for key, value := range result.Data { + b.ReportMetric(value, metric+"/"+key) + } + } + } + // Reset metrics to prevent metrics generated in current workload gets // carried over to the next workload. legacyregistry.Reset() diff --git a/test/utils/ktesting/ktesting.go b/test/utils/ktesting/ktesting.go new file mode 100644 index 00000000000..15508816265 --- /dev/null +++ b/test/utils/ktesting/ktesting.go @@ -0,0 +1,77 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package ktesting is a wrapper around k8s.io/klog/v2/ktesting. It provides +// those (and only those) functions that test code in Kubernetes should use, +// plus better dumping of complex datatypes. It adds the klog command line +// flags and increases the default verbosity to 5. +package ktesting + +import ( + "context" + "flag" + "fmt" + + _ "k8s.io/component-base/logs/testinit" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" + // "k8s.io/kubernetes/test/utils/format" +) + +func init() { + // This is a good default for unit tests. Benchmarks should add their own + // init function or TestMain to lower the default, for example to 2. + SetDefaultVerbosity(5) +} + +// SetDefaultVerbosity can be called during init to modify the default +// log verbosity of the program. +func SetDefaultVerbosity(v int) { + f := flag.CommandLine.Lookup("v") + _ = f.Value.Set(fmt.Sprintf("%d", v)) +} + +// NewTestContext is a wrapper around ktesting.NewTestContext with settings +// specific to Kubernetes. +func NewTestContext(tl ktesting.TL) (klog.Logger, context.Context) { + config := ktesting.NewConfig( + // TODO (pohly): merge + // https://github.com/kubernetes/klog/pull/363, new klog + // release, update and merge + // https://github.com/kubernetes/kubernetes/pull/115277, then + // uncomment this. + // + // ktesting.AnyToString(format.AnyToString), + ktesting.VerbosityFlagName("v"), + ktesting.VModuleFlagName("vmodule"), + ) + + // Copy klog settings instead of making the ktesting logger + // configurable directly. + var fs flag.FlagSet + config.AddFlags(&fs) + for _, name := range []string{"v", "vmodule"} { + from := flag.CommandLine.Lookup(name) + to := fs.Lookup(name) + if err := to.Value.Set(from.Value.String()); err != nil { + panic(err) + } + } + + logger := ktesting.NewLogger(tl, config) + ctx := klog.NewContext(context.Background(), logger) + return logger, ctx +}