Merge pull request #115425 from pohly/scheduler-perf-benchstat

scheduler perf: benchstat support
This commit is contained in:
Kubernetes Prow Robot 2023-03-01 11:19:29 -08:00 committed by GitHub
commit 60eefa8066
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 387 additions and 31 deletions

View File

@ -37,12 +37,12 @@ import (
// with one of them containing events and the other all other objects.
func multiEtcdSetup(t *testing.T) (clientset.Interface, framework.TearDownFunc) {
etcdArgs := []string{"--experimental-watch-progress-notify-interval", "1s"}
etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs)
etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs, nil)
if err != nil {
t.Fatalf("Couldn't start etcd: %v", err)
}
etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs)
etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs, nil)
if err != nil {
t.Fatalf("Couldn't start etcd: %v", err)
}

View File

@ -26,6 +26,7 @@ import (
"os/exec"
"strings"
"syscall"
"testing"
"time"
"go.uber.org/goleak"
@ -62,7 +63,7 @@ func getAvailablePort() (int, error) {
// startEtcd executes an etcd instance. The returned function will signal the
// etcd process and wait for it to exit.
func startEtcd() (func(), error) {
func startEtcd(output io.Writer) (func(), error) {
etcdURL := env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379")
conn, err := net.Dial("tcp", strings.TrimPrefix(etcdURL, "http://"))
if err == nil {
@ -72,7 +73,7 @@ func startEtcd() (func(), error) {
}
klog.V(1).Infof("could not connect to etcd: %v", err)
currentURL, stop, err := RunCustomEtcd("integration_test_etcd_data", nil)
currentURL, stop, err := RunCustomEtcd("integration_test_etcd_data", nil, output)
if err != nil {
return nil, err
}
@ -83,7 +84,7 @@ func startEtcd() (func(), error) {
}
// RunCustomEtcd starts a custom etcd instance for test purposes.
func RunCustomEtcd(dataDir string, customFlags []string) (url string, stopFn func(), err error) {
func RunCustomEtcd(dataDir string, customFlags []string, output io.Writer) (url string, stopFn func(), err error) {
// TODO: Check for valid etcd version.
etcdPath, err := getEtcdPath()
if err != nil {
@ -119,8 +120,13 @@ func RunCustomEtcd(dataDir string, customFlags []string) (url string, stopFn fun
}
args = append(args, customFlags...)
cmd := exec.CommandContext(ctx, etcdPath, args...)
if output == nil {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
} else {
cmd.Stdout = output
cmd.Stderr = output
}
stop := func() {
// try to exit etcd gracefully
defer cancel()
@ -194,7 +200,7 @@ func EtcdMain(tests func() int) {
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
)
stop, err := startEtcd()
stop, err := startEtcd(nil)
if err != nil {
klog.Fatalf("cannot run integration tests: unable to start etcd: %v", err)
}
@ -202,27 +208,32 @@ func EtcdMain(tests func() int) {
stop() // Don't defer this. See os.Exit documentation.
klog.StopFlushDaemon()
// Several tests don't wait for goroutines to stop. goleak.Find retries
// internally, but not long enough. 5 seconds seemed to be enough for
// most tests, even when testing in the CI.
timeout := 5 * time.Second
start := time.Now()
for {
err := goleak.Find(goleakOpts...)
if err == nil {
break
}
if time.Now().Sub(start) >= timeout {
if err := goleakFindRetry(goleakOpts...); err != nil {
klog.ErrorS(err, "EtcdMain goroutine check")
result = 1
break
}
}
os.Exit(result)
}
// GetEtcdURL returns the URL of the etcd instance started by EtcdMain.
// GetEtcdURL returns the URL of the etcd instance started by EtcdMain or StartEtcd.
func GetEtcdURL() string {
return env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379")
}
// StartEtcd starts an etcd instance inside a test. It will abort the test if
// startup fails and clean up after the test automatically. Stdout and stderr
// of the etcd binary go to the provided writer.
//
// In contrast to EtcdMain, StartEtcd will not do automatic leak checking.
// Tests can decide if and where they want to do that.
//
// Starting etcd multiple times per test run instead of once with EtcdMain
// provides better separation between different tests.
func StartEtcd(tb testing.TB, etcdOutput io.Writer) {
stop, err := startEtcd(etcdOutput)
if err != nil {
tb.Fatalf("unable to start etcd: %v", err)
}
tb.Cleanup(stop)
}

View File

@ -17,6 +17,9 @@ limitations under the License.
package framework
import (
"testing"
"time"
"go.uber.org/goleak"
"k8s.io/apiserver/pkg/server/healthz"
)
@ -34,3 +37,34 @@ func IgnoreBackgroundGoroutines() []goleak.Option {
return []goleak.Option{goleak.IgnoreCurrent()}
}
// GoleakCheck sets up leak checking for a test or benchmark.
// The check runs as cleanup operation and records an
// error when goroutines were leaked.
func GoleakCheck(tb testing.TB, opts ...goleak.Option) {
// Must be called *before* creating new goroutines.
opts = append(opts, IgnoreBackgroundGoroutines()...)
tb.Cleanup(func() {
if err := goleakFindRetry(opts...); err != nil {
tb.Error(err.Error())
}
})
}
func goleakFindRetry(opts ...goleak.Option) error {
// Several tests don't wait for goroutines to stop. goleak.Find retries
// internally, but not long enough. 5 seconds seemed to be enough for
// most tests, even when testing in the CI.
timeout := 5 * time.Second
start := time.Now()
for {
err := goleak.Find(opts...)
if err == nil {
return nil
}
if time.Now().Sub(start) >= timeout {
return err
}
}
}

View File

@ -0,0 +1,89 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"flag"
"io"
"testing"
"k8s.io/klog/v2"
)
// RedirectKlog modifies the global klog logger so that it writes via the given
// writer. This only works when different tests run sequentially.
//
// The returned cleanup function restores the previous state. Beware that it is
// not thread-safe, all goroutines which call klog must have been stopped.
func RedirectKlog(tb testing.TB, output io.Writer) func() {
expectNoError := func(err error) {
if err != nil {
tb.Fatalf("unexpected error: %v", err)
}
}
state := klog.CaptureState()
defer func() {
if r := recover(); r != nil {
state.Restore()
panic(r)
}
}()
var fs flag.FlagSet
klog.InitFlags(&fs)
expectNoError(fs.Set("log_file", "/dev/null"))
expectNoError(fs.Set("logtostderr", "false"))
expectNoError(fs.Set("alsologtostderr", "false"))
expectNoError(fs.Set("stderrthreshold", "10"))
expectNoError(fs.Set("one_output", "true"))
klog.SetOutput(output)
return state.Restore
}
// NewTBWriter creates an io.Writer which turns each write into a tb.Log call.
//
// Note that no attempts are made to determine the actual call site because
// our caller doesn't know about the TB instance and thus cannot mark itself
// as helper. Therefore the code here doesn't do it either and thus shows up
// as call site in the testing output. To avoid that, contextual logging
// and ktesting have to be used.
func NewTBWriter(tb testing.TB) io.Writer {
return testingWriter{TB: tb}
}
type testingWriter struct {
testing.TB
}
func (tw testingWriter) Write(data []byte) (int, error) {
logLen := len(data)
if logLen == 0 {
return 0, nil
}
// Trim trailing line break? Log will add it.
if data[logLen-1] == '\n' {
logLen--
}
// We could call TB.Helper here, but that doesn't really help because
// then our caller (code in klog) will be reported instead, which isn't
// right either. klog would have to call TB.Helper itself, but doesn't
// know about the TB instance.
tw.Log(string(data[:logLen]))
return len(data), nil
}
var _ io.Writer = testingWriter{}

View File

@ -77,3 +77,15 @@ The configuration file under `config/performance-config.yaml` contains a default
various scenarios. In case you want to add your own, you can extend the list with new templates.
It's also possible to extend `op` data type, respectively its underlying data types
to extend configuration of possible test cases.
### Logging
The default verbosity is 2 (the recommended value for production). -v can be
used to change this. The log format can be changed with
-logging-format=text|json. The default is to write into a log file (when using
the text format) or stderr (when using JSON). Together these options allow
simulating different real production configurations and to compare their
performance.
During interactive debugging sessions it is possible to enable per-test output
via -use-testing-log.

View File

@ -18,17 +18,55 @@ package benchmark
import (
"flag"
"fmt"
"os"
"strings"
"testing"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/component-base/featuregate"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
_ "k8s.io/component-base/logs/json/register"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestMain(m *testing.M) {
// Run with -v=2, this is the default log level in production.
ktesting.DefaultConfig = ktesting.NewConfig(ktesting.Verbosity(2))
ktesting.DefaultConfig.AddFlags(flag.CommandLine)
ktesting.SetDefaultVerbosity(2)
// test/integration/framework/flags.go unconditionally initializes the
// logging flags. That's correct for most tests, but in the
// scheduler_perf test we want more control over the flags, therefore
// here strip them out.
var fs flag.FlagSet
flag.CommandLine.VisitAll(func(f *flag.Flag) {
switch f.Name {
case "log-flush-frequency", "v", "vmodule":
// These will be added below ourselves, don't copy.
default:
fs.Var(f.Value, f.Name, f.Usage)
}
})
flag.CommandLine = &fs
featureGate := featuregate.NewFeatureGate()
runtime.Must(logsapi.AddFeatureGates(featureGate))
flag.Var(featureGate, "feature-gate",
"A set of key=value pairs that describe feature gates for alpha/experimental features. "+
"Options are:\n"+strings.Join(featureGate.KnownFeatures(), "\n"))
c := logsapi.NewLoggingConfiguration()
// This would fail if we hadn't removed the logging flags above.
logsapi.AddGoFlags(c, flag.CommandLine)
flag.Parse()
framework.EtcdMain(m.Run)
logs.InitLogs()
if err := logsapi.ValidateAndApply(c, featureGate); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
m.Run()
}

View File

@ -19,9 +19,13 @@ package benchmark
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path"
"strings"
"sync"
"testing"
@ -48,6 +52,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils/ktesting"
"sigs.k8s.io/yaml"
)
@ -570,6 +575,39 @@ func (so sleepOp) patchParams(_ *workload) (realOp, error) {
return &so, nil
}
var useTestingLog = flag.Bool("use-testing-log", false, "Write log entries with testing.TB.Log. This is more suitable for unit testing and debugging, but less realistic in real benchmarks.")
func initTestOutput(tb testing.TB) io.Writer {
var output io.Writer
if *useTestingLog {
output = framework.NewTBWriter(tb)
} else {
tmpDir := tb.TempDir()
logfileName := path.Join(tmpDir, "output.log")
fileOutput, err := os.Create(logfileName)
if err != nil {
tb.Fatalf("create log file: %v", err)
}
output = fileOutput
tb.Cleanup(func() {
// Dump the log output when the test is done. The user
// can decide how much of it will be visible in case of
// success: then "go test" truncates, "go test -v"
// doesn't. All of it will be shown for a failure.
if err := fileOutput.Close(); err != nil {
tb.Fatalf("close log file: %v", err)
}
log, err := ioutil.ReadFile(logfileName)
if err != nil {
tb.Fatalf("read log file: %v", err)
}
tb.Logf("full log output:\n%s", string(log))
})
}
return output
}
func BenchmarkPerfScheduling(b *testing.B) {
testCases, err := getTestCases(configFile)
if err != nil {
@ -579,19 +617,76 @@ func BenchmarkPerfScheduling(b *testing.B) {
b.Fatal(err)
}
output := initTestOutput(b)
// Because we run sequentially, it is possible to change the global
// klog logger and redirect log output. Quite a lot of code still uses
// it instead of supporting contextual logging.
//
// Because we leak one goroutine which calls klog, we cannot restore
// the previous state.
_ = framework.RedirectKlog(b, output)
dataItems := DataItems{Version: "v1"}
for _, tc := range testCases {
b.Run(tc.Name, func(b *testing.B) {
for _, w := range tc.Workloads {
b.Run(w.Name, func(b *testing.B) {
// Ensure that there are no leaked
// goroutines. They could influence
// performance of the next benchmark.
// This must *after* RedirectKlog
// because then during cleanup, the
// test will wait for goroutines to
// quit *before* restoring klog settings.
framework.GoleakCheck(b)
ctx := context.Background()
if *useTestingLog {
// In addition to redirection klog
// output, also enable contextual
// logging.
_, ctx = ktesting.NewTestContext(b)
}
// Now that we are ready to run, start
// etcd.
framework.StartEtcd(b, output)
// 30 minutes should be plenty enough even for the 5000-node tests.
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Minute)
ctx, cancel := context.WithTimeout(ctx, 30*time.Minute)
b.Cleanup(cancel)
for feature, flag := range tc.FeatureGates {
defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)()
}
dataItems.DataItems = append(dataItems.DataItems, runWorkload(ctx, b, tc, w)...)
results := runWorkload(ctx, b, tc, w)
dataItems.DataItems = append(dataItems.DataItems, results...)
if len(results) > 0 {
// The default ns/op is not
// useful because it includes
// the time spent on
// initialization and shutdown. Here we suppress it.
b.ReportMetric(0, "ns/op")
// Instead, report the same
// results that also get stored
// in the JSON file.
for _, result := range results {
// For some metrics like
// scheduler_framework_extension_point_duration_seconds
// the actual value has some
// other unit. We patch the key
// to make it look right.
metric := strings.ReplaceAll(result.Labels["Metric"], "_seconds", "_"+result.Unit)
for key, value := range result.Data {
b.ReportMetric(value, metric+"/"+key)
}
}
}
// Reset metrics to prevent metrics generated in current workload gets
// carried over to the next workload.
legacyregistry.Reset()

View File

@ -0,0 +1,77 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package ktesting is a wrapper around k8s.io/klog/v2/ktesting. It provides
// those (and only those) functions that test code in Kubernetes should use,
// plus better dumping of complex datatypes. It adds the klog command line
// flags and increases the default verbosity to 5.
package ktesting
import (
"context"
"flag"
"fmt"
_ "k8s.io/component-base/logs/testinit"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
// "k8s.io/kubernetes/test/utils/format"
)
func init() {
// This is a good default for unit tests. Benchmarks should add their own
// init function or TestMain to lower the default, for example to 2.
SetDefaultVerbosity(5)
}
// SetDefaultVerbosity can be called during init to modify the default
// log verbosity of the program.
func SetDefaultVerbosity(v int) {
f := flag.CommandLine.Lookup("v")
_ = f.Value.Set(fmt.Sprintf("%d", v))
}
// NewTestContext is a wrapper around ktesting.NewTestContext with settings
// specific to Kubernetes.
func NewTestContext(tl ktesting.TL) (klog.Logger, context.Context) {
config := ktesting.NewConfig(
// TODO (pohly): merge
// https://github.com/kubernetes/klog/pull/363, new klog
// release, update and merge
// https://github.com/kubernetes/kubernetes/pull/115277, then
// uncomment this.
//
// ktesting.AnyToString(format.AnyToString),
ktesting.VerbosityFlagName("v"),
ktesting.VModuleFlagName("vmodule"),
)
// Copy klog settings instead of making the ktesting logger
// configurable directly.
var fs flag.FlagSet
config.AddFlags(&fs)
for _, name := range []string{"v", "vmodule"} {
from := flag.CommandLine.Lookup(name)
to := fs.Lookup(name)
if err := to.Value.Set(from.Value.String()); err != nil {
panic(err)
}
}
logger := ktesting.NewLogger(tl, config)
ctx := klog.NewContext(context.Background(), logger)
return logger, ctx
}