kubernetes/test/integration/scheduler_perf/scheduler_perf_test.go
Patrick Ohly cc4bcd1d8e scheduler_perf: report data items as benchmark results
This replaces the pretty useless us/op metric (useless because it includes
setup and teardown times) with the same values that also get stored in the JSON
file.

The main advantage is that benchstat can be used to analyze and compare
results.
2023-02-28 23:08:23 +01:00

1328 lines
41 KiB
Go

/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package benchmark
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path"
"strings"
"sync"
"testing"
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils/ktesting"
"sigs.k8s.io/yaml"
)
type operationCode string
const (
createNodesOpcode operationCode = "createNodes"
createNamespacesOpcode operationCode = "createNamespaces"
createPodsOpcode operationCode = "createPods"
createPodSetsOpcode operationCode = "createPodSets"
churnOpcode operationCode = "churn"
barrierOpcode operationCode = "barrier"
sleepOpcode operationCode = "sleep"
)
const (
// Two modes supported in "churn" operator.
// Create continuously create API objects without deleting them.
Create = "create"
// Recreate creates a number of API objects and then delete them, and repeat the iteration.
Recreate = "recreate"
)
const (
configFile = "config/performance-config.yaml"
extensionPointsLabelName = "extension_point"
)
var (
defaultMetricsCollectorConfig = metricsCollectorConfig{
Metrics: map[string]*labelValues{
"scheduler_framework_extension_point_duration_seconds": {
label: extensionPointsLabelName,
values: []string{"Filter", "Score"},
},
"scheduler_scheduling_attempt_duration_seconds": nil,
"scheduler_pod_scheduling_duration_seconds": nil,
},
}
)
// testCase defines a set of test cases that intends to test the performance of
// similar workloads of varying sizes with shared overall settings such as
// feature gates and metrics collected.
type testCase struct {
// Name of the testCase.
Name string
// Feature gates to set before running the test.
// Optional
FeatureGates map[featuregate.Feature]bool
// List of metrics to collect. Defaults to
// defaultMetricsCollectorConfig if unspecified.
// Optional
MetricsCollectorConfig *metricsCollectorConfig
// Template for sequence of ops that each workload must follow. Each op will
// be executed serially one after another. Each element of the list must be
// createNodesOp, createPodsOp, or barrierOp.
WorkloadTemplate []op
// List of workloads to run under this testCase.
Workloads []*workload
// SchedulerConfigPath is the path of scheduler configuration
// Optional
SchedulerConfigPath *string
// Default path to spec file describing the pods to create.
// This path can be overridden in createPodsOp by setting PodTemplatePath .
// Optional
DefaultPodTemplatePath *string
}
func (tc *testCase) collectsMetrics() bool {
for _, op := range tc.WorkloadTemplate {
if op.realOp.collectsMetrics() {
return true
}
}
return false
}
func (tc *testCase) workloadNamesUnique() error {
workloadUniqueNames := map[string]bool{}
for _, w := range tc.Workloads {
if workloadUniqueNames[w.Name] {
return fmt.Errorf("%s: workload name %s is not unique", tc.Name, w.Name)
}
workloadUniqueNames[w.Name] = true
}
return nil
}
// workload is a subtest under a testCase that tests the scheduler performance
// for a certain ordering of ops. The set of nodes created and pods scheduled
// in a workload may be heterogeneous.
type workload struct {
// Name of the workload.
Name string
// Values of parameters used in the workloadTemplate.
Params params
}
type params struct {
params map[string]int
// isUsed field records whether params is used or not.
isUsed map[string]bool
}
// UnmarshalJSON is a custom unmarshaler for params.
//
// from(json):
//
// {
// "initNodes": 500,
// "initPods": 50
// }
//
// to:
//
// params{
// params: map[string]int{
// "intNodes": 500,
// "initPods": 50,
// },
// isUsed: map[string]bool{}, // empty map
// }
func (p *params) UnmarshalJSON(b []byte) error {
aux := map[string]int{}
if err := json.Unmarshal(b, &aux); err != nil {
return err
}
p.params = aux
p.isUsed = map[string]bool{}
return nil
}
// get returns param.
func (p params) get(key string) (int, error) {
p.isUsed[key] = true
param, ok := p.params[key]
if ok {
return param, nil
}
return 0, fmt.Errorf("parameter %s is undefined", key)
}
// unusedParams returns the names of unusedParams
func (w workload) unusedParams() []string {
var ret []string
for name := range w.Params.params {
if !w.Params.isUsed[name] {
ret = append(ret, name)
}
}
return ret
}
// op is a dummy struct which stores the real op in itself.
type op struct {
realOp realOp
}
// UnmarshalJSON is a custom unmarshaler for the op struct since we don't know
// which op we're decoding at runtime.
func (op *op) UnmarshalJSON(b []byte) error {
possibleOps := []realOp{
&createNodesOp{},
&createNamespacesOp{},
&createPodsOp{},
&createPodSetsOp{},
&churnOp{},
&barrierOp{},
&sleepOp{},
// TODO(#94601): add a delete nodes op to simulate scaling behaviour?
}
var firstError error
for _, possibleOp := range possibleOps {
if err := json.Unmarshal(b, possibleOp); err == nil {
if err2 := possibleOp.isValid(true); err2 == nil {
op.realOp = possibleOp
return nil
} else if firstError == nil {
// Don't return an error yet. Even though this op is invalid, it may
// still match other possible ops.
firstError = err2
}
}
}
return fmt.Errorf("cannot unmarshal %s into any known op type: %w", string(b), firstError)
}
// realOp is an interface that is implemented by different structs. To evaluate
// the validity of ops at parse-time, a isValid function must be implemented.
type realOp interface {
// isValid verifies the validity of the op args such as node/pod count. Note
// that we don't catch undefined parameters at this stage.
isValid(allowParameterization bool) error
// collectsMetrics checks if the op collects metrics.
collectsMetrics() bool
// patchParams returns a patched realOp of the same type after substituting
// parameterizable values with workload-specific values. One should implement
// this method on the value receiver base type, not a pointer receiver base
// type, even though calls will be made from with a *realOp. This is because
// callers don't want the receiver to inadvertently modify the realOp
// (instead, it's returned as a return value).
patchParams(w *workload) (realOp, error)
}
func isValidParameterizable(val string) bool {
return strings.HasPrefix(val, "$")
}
// createNodesOp defines an op where nodes are created as a part of a workload.
type createNodesOp struct {
// Must be "createNodes".
Opcode operationCode
// Number of nodes to create. Parameterizable through CountParam.
Count int
// Template parameter for Count.
CountParam string
// Path to spec file describing the nodes to create.
// Optional
NodeTemplatePath *string
// At most one of the following strategies can be defined. Defaults
// to TrivialNodePrepareStrategy if unspecified.
// Optional
NodeAllocatableStrategy *testutils.NodeAllocatableStrategy
LabelNodePrepareStrategy *testutils.LabelNodePrepareStrategy
UniqueNodeLabelStrategy *testutils.UniqueNodeLabelStrategy
}
func (cno *createNodesOp) isValid(allowParameterization bool) error {
if cno.Opcode != createNodesOpcode {
return fmt.Errorf("invalid opcode %q", cno.Opcode)
}
ok := cno.Count > 0 ||
(cno.CountParam != "" && allowParameterization && isValidParameterizable(cno.CountParam))
if !ok {
return fmt.Errorf("invalid Count=%d / CountParam=%q", cno.Count, cno.CountParam)
}
return nil
}
func (*createNodesOp) collectsMetrics() bool {
return false
}
func (cno createNodesOp) patchParams(w *workload) (realOp, error) {
if cno.CountParam != "" {
var err error
cno.Count, err = w.Params.get(cno.CountParam[1:])
if err != nil {
return nil, err
}
}
return &cno, (&cno).isValid(false)
}
// createNamespacesOp defines an op for creating namespaces
type createNamespacesOp struct {
// Must be "createNamespaces".
Opcode operationCode
// Name prefix of the Namespace. The format is "<prefix>-<number>", where number is
// between 0 and count-1.
Prefix string
// Number of namespaces to create. Parameterizable through CountParam.
Count int
// Template parameter for Count. Takes precedence over Count if both set.
CountParam string
// Path to spec file describing the Namespaces to create.
// Optional
NamespaceTemplatePath *string
}
func (cmo *createNamespacesOp) isValid(allowParameterization bool) error {
if cmo.Opcode != createNamespacesOpcode {
return fmt.Errorf("invalid opcode %q", cmo.Opcode)
}
ok := cmo.Count > 0 ||
(cmo.CountParam != "" && allowParameterization && isValidParameterizable(cmo.CountParam))
if !ok {
return fmt.Errorf("invalid Count=%d / CountParam=%q", cmo.Count, cmo.CountParam)
}
return nil
}
func (*createNamespacesOp) collectsMetrics() bool {
return false
}
func (cmo createNamespacesOp) patchParams(w *workload) (realOp, error) {
if cmo.CountParam != "" {
var err error
cmo.Count, err = w.Params.get(cmo.CountParam[1:])
if err != nil {
return nil, err
}
}
return &cmo, (&cmo).isValid(false)
}
// createPodsOp defines an op where pods are scheduled as a part of a workload.
// The test can block on the completion of this op before moving forward or
// continue asynchronously.
type createPodsOp struct {
// Must be "createPods".
Opcode operationCode
// Number of pods to schedule. Parameterizable through CountParam.
Count int
// Template parameter for Count.
CountParam string
// Whether or not to enable metrics collection for this createPodsOp.
// Optional. Both CollectMetrics and SkipWaitToCompletion cannot be true at
// the same time for a particular createPodsOp.
CollectMetrics bool
// Namespace the pods should be created in. Defaults to a unique
// namespace of the format "namespace-<number>".
// Optional
Namespace *string
// Path to spec file describing the pods to schedule.
// If nil, DefaultPodTemplatePath will be used.
// Optional
PodTemplatePath *string
// Whether or not to wait for all pods in this op to get scheduled.
// Defaults to false if not specified.
// Optional
SkipWaitToCompletion bool
// Persistent volume settings for the pods to be scheduled.
// Optional
PersistentVolumeTemplatePath *string
PersistentVolumeClaimTemplatePath *string
}
func (cpo *createPodsOp) isValid(allowParameterization bool) error {
if cpo.Opcode != createPodsOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", cpo.Opcode, createPodsOpcode)
}
ok := cpo.Count > 0 ||
(cpo.CountParam != "" && allowParameterization && isValidParameterizable(cpo.CountParam))
if !ok {
return fmt.Errorf("invalid Count=%d / CountParam=%q", cpo.Count, cpo.CountParam)
}
if cpo.CollectMetrics && cpo.SkipWaitToCompletion {
// While it's technically possible to achieve this, the additional
// complexity is not worth it, especially given that we don't have any
// use-cases right now.
return fmt.Errorf("collectMetrics and skipWaitToCompletion cannot be true at the same time")
}
return nil
}
func (cpo *createPodsOp) collectsMetrics() bool {
return cpo.CollectMetrics
}
func (cpo createPodsOp) patchParams(w *workload) (realOp, error) {
if cpo.CountParam != "" {
var err error
cpo.Count, err = w.Params.get(cpo.CountParam[1:])
if err != nil {
return nil, err
}
}
return &cpo, (&cpo).isValid(false)
}
// createPodSetsOp defines an op where a set of createPodsOps is created in each unique namespace.
type createPodSetsOp struct {
// Must be "createPodSets".
Opcode operationCode
// Number of sets to create.
Count int
// Template parameter for Count.
CountParam string
// Each set of pods will be created in a namespace of the form namespacePrefix-<number>,
// where number is from 0 to count-1
NamespacePrefix string
// The template of a createPodsOp.
CreatePodsOp createPodsOp
}
func (cpso *createPodSetsOp) isValid(allowParameterization bool) error {
if cpso.Opcode != createPodSetsOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", cpso.Opcode, createPodSetsOpcode)
}
ok := cpso.Count > 0 ||
(cpso.CountParam != "" && allowParameterization && isValidParameterizable(cpso.CountParam))
if !ok {
return fmt.Errorf("invalid Count=%d / CountParam=%q", cpso.Count, cpso.CountParam)
}
return cpso.CreatePodsOp.isValid(allowParameterization)
}
func (cpso *createPodSetsOp) collectsMetrics() bool {
return cpso.CreatePodsOp.CollectMetrics
}
func (cpso createPodSetsOp) patchParams(w *workload) (realOp, error) {
if cpso.CountParam != "" {
var err error
cpso.Count, err = w.Params.get(cpso.CountParam[1:])
if err != nil {
return nil, err
}
}
return &cpso, (&cpso).isValid(true)
}
// churnOp defines an op where services are created as a part of a workload.
type churnOp struct {
// Must be "churnOp".
Opcode operationCode
// Value must be one of the followings:
// - recreate. In this mode, API objects will be created for N cycles, and then
// deleted in the next N cycles. N is specified by the "Number" field.
// - create. In this mode, API objects will be created (without deletion) until
// reaching a threshold - which is specified by the "Number" field.
Mode string
// Maximum number of API objects to be created.
// Defaults to 0, which means unlimited.
Number int
// Intervals of churning. Defaults to 500 millisecond.
IntervalMilliseconds int64
// Namespace the churning objects should be created in. Defaults to a unique
// namespace of the format "namespace-<number>".
// Optional
Namespace *string
// Path of API spec files.
TemplatePaths []string
}
func (co *churnOp) isValid(_ bool) error {
if co.Opcode != churnOpcode {
return fmt.Errorf("invalid opcode %q", co.Opcode)
}
if co.Mode != Recreate && co.Mode != Create {
return fmt.Errorf("invalid mode: %v. must be one of %v", co.Mode, []string{Recreate, Create})
}
if co.Number < 0 {
return fmt.Errorf("number (%v) cannot be negative", co.Number)
}
if co.Mode == Recreate && co.Number == 0 {
return fmt.Errorf("number cannot be 0 when mode is %v", Recreate)
}
if len(co.TemplatePaths) == 0 {
return fmt.Errorf("at least one template spec file needs to be specified")
}
return nil
}
func (*churnOp) collectsMetrics() bool {
return false
}
func (co churnOp) patchParams(w *workload) (realOp, error) {
return &co, nil
}
// barrierOp defines an op that can be used to wait until all scheduled pods of
// one or many namespaces have been bound to nodes. This is useful when pods
// were scheduled with SkipWaitToCompletion set to true.
type barrierOp struct {
// Must be "barrier".
Opcode operationCode
// Namespaces to block on. Empty array or not specifying this field signifies
// that the barrier should block on all namespaces.
Namespaces []string
}
func (bo *barrierOp) isValid(allowParameterization bool) error {
if bo.Opcode != barrierOpcode {
return fmt.Errorf("invalid opcode %q", bo.Opcode)
}
return nil
}
func (*barrierOp) collectsMetrics() bool {
return false
}
func (bo barrierOp) patchParams(w *workload) (realOp, error) {
return &bo, nil
}
// sleepOp defines an op that can be used to sleep for a specified amount of time.
// This is useful in simulating workloads that require some sort of time-based synchronisation.
type sleepOp struct {
// Must be "sleep".
Opcode operationCode
// duration of sleep.
Duration time.Duration
}
func (so *sleepOp) UnmarshalJSON(data []byte) (err error) {
var tmp struct {
Opcode operationCode
Duration string
}
if err = json.Unmarshal(data, &tmp); err != nil {
return err
}
so.Opcode = tmp.Opcode
so.Duration, err = time.ParseDuration(tmp.Duration)
return err
}
func (so *sleepOp) isValid(_ bool) error {
if so.Opcode != sleepOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", so.Opcode, sleepOpcode)
}
return nil
}
func (so *sleepOp) collectsMetrics() bool {
return false
}
func (so sleepOp) patchParams(_ *workload) (realOp, error) {
return &so, nil
}
var useTestingLog = flag.Bool("use-testing-log", false, "Write log entries with testing.TB.Log. This is more suitable for unit testing and debugging, but less realistic in real benchmarks.")
func initTestOutput(tb testing.TB) io.Writer {
var output io.Writer
if *useTestingLog {
output = framework.NewTBWriter(tb)
} else {
tmpDir := tb.TempDir()
logfileName := path.Join(tmpDir, "output.log")
fileOutput, err := os.Create(logfileName)
if err != nil {
tb.Fatalf("create log file: %v", err)
}
output = fileOutput
tb.Cleanup(func() {
// Dump the log output when the test is done. The user
// can decide how much of it will be visible in case of
// success: then "go test" truncates, "go test -v"
// doesn't. All of it will be shown for a failure.
if err := fileOutput.Close(); err != nil {
tb.Fatalf("close log file: %v", err)
}
log, err := ioutil.ReadFile(logfileName)
if err != nil {
tb.Fatalf("read log file: %v", err)
}
tb.Logf("full log output:\n%s", string(log))
})
}
return output
}
func BenchmarkPerfScheduling(b *testing.B) {
testCases, err := getTestCases(configFile)
if err != nil {
b.Fatal(err)
}
if err = validateTestCases(testCases); err != nil {
b.Fatal(err)
}
output := initTestOutput(b)
// Because we run sequentially, it is possible to change the global
// klog logger and redirect log output. Quite a lot of code still uses
// it instead of supporting contextual logging.
//
// Because we leak one goroutine which calls klog, we cannot restore
// the previous state.
_ = framework.RedirectKlog(b, output)
dataItems := DataItems{Version: "v1"}
for _, tc := range testCases {
b.Run(tc.Name, func(b *testing.B) {
for _, w := range tc.Workloads {
b.Run(w.Name, func(b *testing.B) {
// Ensure that there are no leaked
// goroutines. They could influence
// performance of the next benchmark.
// This must *after* RedirectKlog
// because then during cleanup, the
// test will wait for goroutines to
// quit *before* restoring klog settings.
framework.GoleakCheck(b)
ctx := context.Background()
if *useTestingLog {
// In addition to redirection klog
// output, also enable contextual
// logging.
_, ctx = ktesting.NewTestContext(b)
}
// Now that we are ready to run, start
// etcd.
framework.StartEtcd(b, output)
// 30 minutes should be plenty enough even for the 5000-node tests.
ctx, cancel := context.WithTimeout(ctx, 30*time.Minute)
b.Cleanup(cancel)
for feature, flag := range tc.FeatureGates {
defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)()
}
results := runWorkload(ctx, b, tc, w)
dataItems.DataItems = append(dataItems.DataItems, results...)
if len(results) > 0 {
// The default ns/op is not
// useful because it includes
// the time spent on
// initialization and shutdown. Here we suppress it.
b.ReportMetric(0, "ns/op")
// Instead, report the same
// results that also get stored
// in the JSON file.
for _, result := range results {
// For some metrics like
// scheduler_framework_extension_point_duration_seconds
// the actual value has some
// other unit. We patch the key
// to make it look right.
metric := strings.ReplaceAll(result.Labels["Metric"], "_seconds", "_"+result.Unit)
for key, value := range result.Data {
b.ReportMetric(value, metric+"/"+key)
}
}
}
// Reset metrics to prevent metrics generated in current workload gets
// carried over to the next workload.
legacyregistry.Reset()
})
}
})
}
if err := dataItems2JSONFile(dataItems, b.Name()); err != nil {
b.Fatalf("unable to write measured data %+v: %v", dataItems, err)
}
}
func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error) {
data, err := os.ReadFile(file)
if err != nil {
return nil, err
}
// The UniversalDecoder runs defaulting and returns the internal type by default.
obj, gvk, err := scheme.Codecs.UniversalDecoder().Decode(data, nil, nil)
if err != nil {
return nil, err
}
if cfgObj, ok := obj.(*config.KubeSchedulerConfiguration); ok {
return cfgObj, nil
}
return nil, fmt.Errorf("couldn't decode as KubeSchedulerConfiguration, got %s: ", gvk)
}
func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op {
var unrolled []op
for opIndex, o := range wt {
realOp, err := o.realOp.patchParams(w)
if err != nil {
b.Fatalf("op %d: %v", opIndex, err)
}
switch concreteOp := realOp.(type) {
case *createPodSetsOp:
b.Logf("Creating %d pod sets %s", concreteOp.Count, concreteOp.CountParam)
for i := 0; i < concreteOp.Count; i++ {
copy := concreteOp.CreatePodsOp
ns := fmt.Sprintf("%s-%d", concreteOp.NamespacePrefix, i)
copy.Namespace = &ns
unrolled = append(unrolled, op{realOp: &copy})
}
default:
unrolled = append(unrolled, o)
}
}
return unrolled
}
func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) []DataItem {
var cfg *config.KubeSchedulerConfiguration
var err error
if tc.SchedulerConfigPath != nil {
cfg, err = loadSchedulerConfig(*tc.SchedulerConfigPath)
if err != nil {
b.Fatalf("error loading scheduler config file: %v", err)
}
if err = validation.ValidateKubeSchedulerConfiguration(cfg); err != nil {
b.Fatalf("validate scheduler config file failed: %v", err)
}
}
finalFunc, podInformer, client, dynClient := mustSetupScheduler(ctx, b, cfg)
b.Cleanup(finalFunc)
var mu sync.Mutex
var dataItems []DataItem
nextNodeIndex := 0
// numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have.
// All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
numPodsScheduledPerNamespace := make(map[string]int)
b.Cleanup(func() {
for namespace := range numPodsScheduledPerNamespace {
if err := client.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{}); err != nil {
b.Errorf("Deleting Namespace in numPodsScheduledPerNamespace: %v", err)
}
}
})
for opIndex, op := range unrollWorkloadTemplate(b, tc.WorkloadTemplate, w) {
realOp, err := op.realOp.patchParams(w)
if err != nil {
b.Fatalf("op %d: %v", opIndex, err)
}
select {
case <-ctx.Done():
b.Fatalf("op %d: %v", opIndex, ctx.Err())
default:
}
switch concreteOp := realOp.(type) {
case *createNodesOp:
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, client)
if err != nil {
b.Fatalf("op %d: %v", opIndex, err)
}
if err := nodePreparer.PrepareNodes(ctx, nextNodeIndex); err != nil {
b.Fatalf("op %d: %v", opIndex, err)
}
b.Cleanup(func() {
if err := nodePreparer.CleanupNodes(ctx); err != nil {
b.Fatalf("failed to clean up nodes, error: %v", err)
}
})
nextNodeIndex += concreteOp.Count
case *createNamespacesOp:
nsPreparer, err := newNamespacePreparer(concreteOp, client, b)
if err != nil {
b.Fatalf("op %d: %v", opIndex, err)
}
if err := nsPreparer.prepare(ctx); err != nil {
nsPreparer.cleanup(ctx)
b.Fatalf("op %d: %v", opIndex, err)
}
for _, n := range nsPreparer.namespaces() {
if _, ok := numPodsScheduledPerNamespace[n]; ok {
// this namespace has been already created.
continue
}
numPodsScheduledPerNamespace[n] = 0
}
case *createPodsOp:
var namespace string
// define Pod's namespace automatically, and create that namespace.
namespace = fmt.Sprintf("namespace-%d", opIndex)
if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace
}
if _, ok := numPodsScheduledPerNamespace[namespace]; !ok {
// The namespace has not created yet.
// So, creat that and register it to numPodsScheduledPerNamespace.
_, err := client.CoreV1().Namespaces().Create(ctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
if err != nil {
b.Fatalf("failed to create namespace for Pod: %v", namespace)
}
numPodsScheduledPerNamespace[namespace] = 0
}
if concreteOp.PodTemplatePath == nil {
concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath
}
var collectors []testDataCollector
var collectorCtx context.Context
var collectorCancel func()
if concreteOp.CollectMetrics {
collectorCtx, collectorCancel = context.WithCancel(ctx)
defer collectorCancel()
collectors = getTestDataCollectors(podInformer, fmt.Sprintf("%s/%s", b.Name(), namespace), namespace, tc.MetricsCollectorConfig)
for _, collector := range collectors {
go collector.run(collectorCtx)
}
}
if err := createPods(ctx, b, namespace, concreteOp, client); err != nil {
b.Fatalf("op %d: %v", opIndex, err)
}
if concreteOp.SkipWaitToCompletion {
// Only record those namespaces that may potentially require barriers
// in the future.
if _, ok := numPodsScheduledPerNamespace[namespace]; ok {
numPodsScheduledPerNamespace[namespace] += concreteOp.Count
} else {
numPodsScheduledPerNamespace[namespace] = concreteOp.Count
}
} else {
if err := waitUntilPodsScheduledInNamespace(ctx, b, podInformer, namespace, concreteOp.Count); err != nil {
b.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
}
}
if concreteOp.CollectMetrics {
// CollectMetrics and SkipWaitToCompletion can never be true at the
// same time, so if we're here, it means that all pods have been
// scheduled.
collectorCancel()
mu.Lock()
for _, collector := range collectors {
dataItems = append(dataItems, collector.collect()...)
}
mu.Unlock()
}
if !concreteOp.SkipWaitToCompletion {
// SkipWaitToCompletion=false indicates this step has waited for the Pods to be scheduled.
// So we reset the metrics in global registry; otherwise metrics gathered in this step
// will be carried over to next step.
legacyregistry.Reset()
}
case *churnOp:
var namespace string
if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace
} else {
namespace = fmt.Sprintf("namespace-%d", opIndex)
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(client.Discovery()))
// Ensure the namespace exists.
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
if _, err := client.CoreV1().Namespaces().Create(ctx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
b.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
}
var churnFns []func(name string) string
for i, path := range concreteOp.TemplatePaths {
unstructuredObj, gvk, err := getUnstructuredFromFile(path)
if err != nil {
b.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
}
// Obtain GVR.
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
b.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
}
gvr := mapping.Resource
// Distinguish cluster-scoped with namespaced API objects.
var dynRes dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
dynRes = dynClient.Resource(gvr).Namespace(namespace)
} else {
dynRes = dynClient.Resource(gvr)
}
churnFns = append(churnFns, func(name string) string {
if name != "" {
dynRes.Delete(ctx, name, metav1.DeleteOptions{})
return ""
}
live, err := dynRes.Create(ctx, unstructuredObj, metav1.CreateOptions{})
if err != nil {
return ""
}
return live.GetName()
})
}
var interval int64 = 500
if concreteOp.IntervalMilliseconds != 0 {
interval = concreteOp.IntervalMilliseconds
}
ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
defer ticker.Stop()
switch concreteOp.Mode {
case Create:
go func() {
count, threshold := 0, concreteOp.Number
if threshold == 0 {
threshold = math.MaxInt32
}
for count < threshold {
select {
case <-ticker.C:
for i := range churnFns {
churnFns[i]("")
}
count++
case <-ctx.Done():
return
}
}
}()
case Recreate:
go func() {
retVals := make([][]string, len(churnFns))
// For each churn function, instantiate a slice of strings with length "concreteOp.Number".
for i := range retVals {
retVals[i] = make([]string, concreteOp.Number)
}
count := 0
for {
select {
case <-ticker.C:
for i := range churnFns {
retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number])
}
count++
case <-ctx.Done():
return
}
}
}()
}
case *barrierOp:
for _, namespace := range concreteOp.Namespaces {
if _, ok := numPodsScheduledPerNamespace[namespace]; !ok {
b.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
}
}
if err := waitUntilPodsScheduled(ctx, b, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
b.Fatalf("op %d: %v", opIndex, err)
}
// At the end of the barrier, we can be sure that there are no pods
// pending scheduling in the namespaces that we just blocked on.
if len(concreteOp.Namespaces) == 0 {
numPodsScheduledPerNamespace = make(map[string]int)
} else {
for _, namespace := range concreteOp.Namespaces {
delete(numPodsScheduledPerNamespace, namespace)
}
}
case *sleepOp:
select {
case <-ctx.Done():
case <-time.After(concreteOp.Duration):
}
default:
b.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
}
}
// check unused params and inform users
unusedParams := w.unusedParams()
if len(unusedParams) != 0 {
b.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name)
}
// Some tests have unschedulable pods. Do not add an implicit barrier at the
// end as we do not want to wait for them.
return dataItems
}
type testDataCollector interface {
run(ctx context.Context)
collect() []DataItem
}
func getTestDataCollectors(podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig) []testDataCollector {
if mcc == nil {
mcc = &defaultMetricsCollectorConfig
}
return []testDataCollector{
newThroughputCollector(podInformer, map[string]string{"Name": name}, []string{namespace}),
newMetricsCollector(mcc, map[string]string{"Name": name}),
}
}
func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Interface) (testutils.TestNodePreparer, error) {
var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
if cno.NodeAllocatableStrategy != nil {
nodeStrategy = cno.NodeAllocatableStrategy
} else if cno.LabelNodePrepareStrategy != nil {
nodeStrategy = cno.LabelNodePrepareStrategy
} else if cno.UniqueNodeLabelStrategy != nil {
nodeStrategy = cno.UniqueNodeLabelStrategy
}
if cno.NodeTemplatePath != nil {
node, err := getNodeSpecFromFile(cno.NodeTemplatePath)
if err != nil {
return nil, err
}
return framework.NewIntegrationTestNodePreparerWithNodeSpec(
clientset,
[]testutils.CountToStrategy{{Count: cno.Count, Strategy: nodeStrategy}},
node,
), nil
}
return framework.NewIntegrationTestNodePreparer(
clientset,
[]testutils.CountToStrategy{{Count: cno.Count, Strategy: nodeStrategy}},
prefix,
), nil
}
func createPods(ctx context.Context, b *testing.B, namespace string, cpo *createPodsOp, clientset clientset.Interface) error {
strategy, err := getPodStrategy(cpo)
if err != nil {
return err
}
b.Logf("creating %d pods in namespace %q", cpo.Count, namespace)
config := testutils.NewTestPodCreatorConfig()
config.AddStrategy(namespace, cpo.Count, strategy)
podCreator := testutils.NewTestPodCreator(clientset, config)
return podCreator.CreatePods(ctx)
}
// waitUntilPodsScheduledInNamespace blocks until all pods in the given
// namespace are scheduled. Times out after 10 minutes because even at the
// lowest observed QPS of ~10 pods/sec, a 5000-node test should complete.
func waitUntilPodsScheduledInNamespace(ctx context.Context, b *testing.B, podInformer coreinformers.PodInformer, namespace string, wantCount int) error {
return wait.PollImmediate(1*time.Second, 10*time.Minute, func() (bool, error) {
select {
case <-ctx.Done():
return true, ctx.Err()
default:
}
scheduled, err := getScheduledPods(podInformer, namespace)
if err != nil {
return false, err
}
if len(scheduled) >= wantCount {
b.Logf("scheduling succeed")
return true, nil
}
b.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled))
return false, nil
})
}
// waitUntilPodsScheduled blocks until the all pods in the given namespaces are
// scheduled.
func waitUntilPodsScheduled(ctx context.Context, b *testing.B, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error {
// If unspecified, default to all known namespaces.
if len(namespaces) == 0 {
for namespace := range numPodsScheduledPerNamespace {
namespaces = append(namespaces, namespace)
}
}
for _, namespace := range namespaces {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
wantCount, ok := numPodsScheduledPerNamespace[namespace]
if !ok {
return fmt.Errorf("unknown namespace %s", namespace)
}
if err := waitUntilPodsScheduledInNamespace(ctx, b, podInformer, namespace, wantCount); err != nil {
return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err)
}
}
return nil
}
func getSpecFromFile(path *string, spec interface{}) error {
bytes, err := os.ReadFile(*path)
if err != nil {
return err
}
return yaml.UnmarshalStrict(bytes, spec)
}
func getUnstructuredFromFile(path string) (*unstructured.Unstructured, *schema.GroupVersionKind, error) {
bytes, err := os.ReadFile(path)
if err != nil {
return nil, nil, err
}
bytes, err = yaml.YAMLToJSONStrict(bytes)
if err != nil {
return nil, nil, fmt.Errorf("cannot covert YAML to JSON: %v", err)
}
obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(bytes, nil, nil)
if err != nil {
return nil, nil, err
}
unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return nil, nil, fmt.Errorf("cannot convert spec file in %v to an unstructured obj", path)
}
return unstructuredObj, gvk, nil
}
func getTestCases(path string) ([]*testCase, error) {
testCases := make([]*testCase, 0)
if err := getSpecFromFile(&path, &testCases); err != nil {
return nil, fmt.Errorf("parsing test cases error: %w", err)
}
return testCases, nil
}
func validateTestCases(testCases []*testCase) error {
if len(testCases) == 0 {
return fmt.Errorf("no test cases defined")
}
testCaseUniqueNames := map[string]bool{}
for _, tc := range testCases {
if testCaseUniqueNames[tc.Name] {
return fmt.Errorf("%s: name is not unique", tc.Name)
}
testCaseUniqueNames[tc.Name] = true
if len(tc.Workloads) == 0 {
return fmt.Errorf("%s: no workloads defined", tc.Name)
}
if err := tc.workloadNamesUnique(); err != nil {
return err
}
if len(tc.WorkloadTemplate) == 0 {
return fmt.Errorf("%s: no ops defined", tc.Name)
}
// Make sure there's at least one CreatePods op with collectMetrics set to
// true in each workload. What's the point of running a performance
// benchmark if no statistics are collected for reporting?
if !tc.collectsMetrics() {
return fmt.Errorf("%s: no op in the workload template collects metrics", tc.Name)
}
// TODO(#93795): make sure each workload within a test case has a unique
// name? The name is used to identify the stats in benchmark reports.
// TODO(#94404): check for unused template parameters? Probably a typo.
}
return nil
}
func getPodStrategy(cpo *createPodsOp) (testutils.TestPodCreateStrategy, error) {
basePod := makeBasePod()
if cpo.PodTemplatePath != nil {
var err error
basePod, err = getPodSpecFromFile(cpo.PodTemplatePath)
if err != nil {
return nil, err
}
}
if cpo.PersistentVolumeClaimTemplatePath == nil {
return testutils.NewCustomCreatePodStrategy(basePod), nil
}
pvTemplate, err := getPersistentVolumeSpecFromFile(cpo.PersistentVolumeTemplatePath)
if err != nil {
return nil, err
}
pvcTemplate, err := getPersistentVolumeClaimSpecFromFile(cpo.PersistentVolumeClaimTemplatePath)
if err != nil {
return nil, err
}
return testutils.NewCreatePodWithPersistentVolumeStrategy(pvcTemplate, getCustomVolumeFactory(pvTemplate), basePod), nil
}
func getNodeSpecFromFile(path *string) (*v1.Node, error) {
nodeSpec := &v1.Node{}
if err := getSpecFromFile(path, nodeSpec); err != nil {
return nil, fmt.Errorf("parsing Node: %w", err)
}
return nodeSpec, nil
}
func getPodSpecFromFile(path *string) (*v1.Pod, error) {
podSpec := &v1.Pod{}
if err := getSpecFromFile(path, podSpec); err != nil {
return nil, fmt.Errorf("parsing Pod: %w", err)
}
return podSpec, nil
}
func getPersistentVolumeSpecFromFile(path *string) (*v1.PersistentVolume, error) {
persistentVolumeSpec := &v1.PersistentVolume{}
if err := getSpecFromFile(path, persistentVolumeSpec); err != nil {
return nil, fmt.Errorf("parsing PersistentVolume: %w", err)
}
return persistentVolumeSpec, nil
}
func getPersistentVolumeClaimSpecFromFile(path *string) (*v1.PersistentVolumeClaim, error) {
persistentVolumeClaimSpec := &v1.PersistentVolumeClaim{}
if err := getSpecFromFile(path, persistentVolumeClaimSpec); err != nil {
return nil, fmt.Errorf("parsing PersistentVolumeClaim: %w", err)
}
return persistentVolumeClaimSpec, nil
}
func getCustomVolumeFactory(pvTemplate *v1.PersistentVolume) func(id int) *v1.PersistentVolume {
return func(id int) *v1.PersistentVolume {
pv := pvTemplate.DeepCopy()
volumeID := fmt.Sprintf("vol-%d", id)
pv.ObjectMeta.Name = volumeID
pvs := pv.Spec.PersistentVolumeSource
if pvs.CSI != nil {
pvs.CSI.VolumeHandle = volumeID
} else if pvs.AWSElasticBlockStore != nil {
pvs.AWSElasticBlockStore.VolumeID = volumeID
}
return pv
}
}
// namespacePreparer holds configuration information for the test namespace preparer.
type namespacePreparer struct {
client clientset.Interface
count int
prefix string
spec *v1.Namespace
t testing.TB
}
func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface, b *testing.B) (*namespacePreparer, error) {
ns := &v1.Namespace{}
if cno.NamespaceTemplatePath != nil {
if err := getSpecFromFile(cno.NamespaceTemplatePath, ns); err != nil {
return nil, fmt.Errorf("parsing NamespaceTemplate: %w", err)
}
}
return &namespacePreparer{
client: clientset,
count: cno.Count,
prefix: cno.Prefix,
spec: ns,
t: b,
}, nil
}
// namespaces returns namespace names have been (or will be) created by this namespacePreparer
func (p *namespacePreparer) namespaces() []string {
namespaces := make([]string, p.count)
for i := 0; i < p.count; i++ {
namespaces[i] = fmt.Sprintf("%s-%d", p.prefix, i)
}
return namespaces
}
// prepare creates the namespaces.
func (p *namespacePreparer) prepare(ctx context.Context) error {
base := &v1.Namespace{}
if p.spec != nil {
base = p.spec
}
p.t.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base)
for i := 0; i < p.count; i++ {
n := base.DeepCopy()
n.Name = fmt.Sprintf("%s-%d", p.prefix, i)
if err := testutils.RetryWithExponentialBackOff(func() (bool, error) {
_, err := p.client.CoreV1().Namespaces().Create(ctx, n, metav1.CreateOptions{})
return err == nil || apierrors.IsAlreadyExists(err), nil
}); err != nil {
return err
}
}
return nil
}
// cleanup deletes existing test namespaces.
func (p *namespacePreparer) cleanup(ctx context.Context) error {
var errRet error
for i := 0; i < p.count; i++ {
n := fmt.Sprintf("%s-%d", p.prefix, i)
if err := p.client.CoreV1().Namespaces().Delete(ctx, n, metav1.DeleteOptions{}); err != nil {
p.t.Errorf("Deleting Namespace: %v", err)
errRet = err
}
}
return errRet
}