scheduler_perf: add TestScheduling integration test

This runs workloads that are labeled as "integration-test". The apiserver and
scheduler are only started once per unique configuration, followed by each
workload using that configuration. This makes execution faster. In contrast to
benchmarking, we care less about starting with a clean slate for each test.
This commit is contained in:
Patrick Ohly 2023-03-23 17:01:42 +01:00
parent dfd646e0a8
commit cecebe8ea2
4 changed files with 220 additions and 86 deletions

View File

@ -100,3 +100,15 @@ performance.
During interactive debugging sessions it is possible to enable per-test output
via -use-testing-log.
## Integration tests
To run integration tests, use:
```
make test-integration WHAT=./test/integration/scheduler_perf KUBE_TEST_ARGS=-use-testing-log
```
Integration testing uses the same `config/performance-config.yaml` as
benchmarking. By default, workloads labeled as `integration-test` are executed
as part of integration testing. `-test-scheduling-label-filter` can be used to
change that.

View File

@ -1,3 +1,17 @@
# The following labels are used in this file:
# - fast: short execution time, ideally less than 30 seconds
# - integration-test: used to select workloads that
# run in pull-kubernetes-integration. Choosing those tests
# is a tradeoff between code coverage and overall runtime.
# - performance: used to select workloads that run
# in ci-benchmark-scheduler-perf. Such workloads
# must run long enough (ideally, longer than 10 seconds)
# to provide meaningful samples for the pod scheduling
# rate.
#
# Combining "performance" and "fast" selects suitable workloads for a local
# before/after comparisons with benchstat.
- name: SchedulingBasic
defaultPodTemplatePath: config/pod-default.yaml
workloadTemplate:
@ -10,7 +24,7 @@
collectMetrics: true
workloads:
- name: 500Nodes
labels: [fast]
labels: [integration-test, fast]
params:
initNodes: 500
initPods: 500
@ -39,7 +53,7 @@
namespace: sched-1
workloads:
- name: 500Nodes
labels: [fast]
labels: [integration-test, fast]
params:
initNodes: 500
initPods: 100
@ -161,7 +175,7 @@
collectMetrics: true
workloads:
- name: 500Nodes
labels: [fast]
labels: [integration-test, fast]
params:
initNodes: 500
initPods: 500
@ -223,7 +237,7 @@
collectMetrics: true
workloads:
- name: 500Nodes
labels: [fast]
labels: [integration-test, fast]
params:
initNodes: 500
initPods: 500
@ -308,7 +322,7 @@
collectMetrics: true
workloads:
- name: 500Nodes
labels: [fast]
labels: [integration-test, fast]
params:
initNodes: 500
initPods: 1000
@ -386,7 +400,7 @@
collectMetrics: true
workloads:
- name: 500Nodes
labels: [fast]
labels: [integration-test, fast]
params:
initNodes: 500
initPods: 200
@ -504,7 +518,7 @@
collectMetrics: true
workloads:
- name: 1000Nodes
labels: [fast]
labels: [integration-test, fast]
params:
initNodes: 1000
measurePods: 1000
@ -734,6 +748,7 @@
collectMetrics: true
workloads:
- name: fast
labels: [integration-test, fast]
params:
# This testcase runs through all code paths without
# taking too long overall.
@ -743,7 +758,7 @@
measurePods: 10
maxClaimsPerNode: 10
- name: 2000pods_100nodes
labels: [performance,fast]
labels: [performance, fast]
params:
# In this testcase, the number of nodes is smaller
# than the limit for the PodScheduling slices.

View File

@ -30,6 +30,7 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
@ -43,6 +44,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper"
@ -128,7 +130,7 @@ type testCase struct {
Workloads []*workload
// SchedulerConfigPath is the path of scheduler configuration
// Optional
SchedulerConfigPath *string
SchedulerConfigPath string
// Default path to spec file describing the pods to create.
// This path can be overridden in createPodsOp by setting PodTemplatePath .
// Optional
@ -640,7 +642,7 @@ func initTestOutput(tb testing.TB) io.Writer {
return output
}
var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-)")
var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by BenchmarkPerfScheduling")
func BenchmarkPerfScheduling(b *testing.B) {
testCases, err := getTestCases(configFile)
@ -699,7 +701,8 @@ func BenchmarkPerfScheduling(b *testing.B) {
for feature, flag := range tc.FeatureGates {
defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)()
}
results := runWorkload(ctx, b, tc, w, false)
informerFactory, client, dyncClient := setupClusterForWorkload(ctx, b, tc.SchedulerConfigPath, tc.FeatureGates)
results := runWorkload(ctx, b, tc, w, informerFactory, client, dyncClient, false)
dataItems.DataItems = append(dataItems.DataItems, results...)
if len(results) > 0 {
@ -737,6 +740,95 @@ func BenchmarkPerfScheduling(b *testing.B) {
}
}
var testSchedulingLabelFilter = flag.String("test-scheduling-label-filter", "integration-test", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by TestScheduling")
func TestScheduling(t *testing.T) {
testCases, err := getTestCases(configFile)
if err != nil {
t.Fatal(err)
}
if err = validateTestCases(testCases); err != nil {
t.Fatal(err)
}
// Check for leaks at the very end.
framework.GoleakCheck(t)
// All integration test cases share the same etcd, similar to
// https://github.com/kubernetes/kubernetes/blob/18d05b646d09b2971dc5400bc288062b0414e8cf/test/integration/framework/etcd.go#L186-L222.
framework.StartEtcd(t, nil)
// Workloads with the same configuration share the same apiserver. For that
// we first need to determine what those different configs are.
var configs []schedulerConfig
for _, tc := range testCases {
tcEnabled := false
for _, w := range tc.Workloads {
if enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
tcEnabled = true
break
}
}
if !tcEnabled {
continue
}
exists := false
for _, config := range configs {
if config.equals(tc) {
exists = true
break
}
}
if !exists {
configs = append(configs, schedulerConfig{schedulerConfigPath: tc.SchedulerConfigPath, featureGates: tc.FeatureGates})
}
}
for _, config := range configs {
// Not a sub test because we don't have a good name for it.
func() {
_, ctx := ktesting.NewTestContext(t)
// No timeout here because the `go test -timeout` will ensure that
// the test doesn't get stuck forever.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for feature, flag := range config.featureGates {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, flag)()
}
informerFactory, client, dynClient := setupClusterForWorkload(ctx, t, config.schedulerConfigPath, config.featureGates)
for _, tc := range testCases {
if !config.equals(tc) {
// Runs with some other config.
continue
}
t.Run(tc.Name, func(t *testing.T) {
for _, w := range tc.Workloads {
t.Run(w.Name, func(t *testing.T) {
if !enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
t.Skipf("disabled by label filter %q", *testSchedulingLabelFilter)
}
_, ctx := ktesting.NewTestContext(t)
runWorkload(ctx, t, tc, w, informerFactory, client, dynClient, true)
})
}
})
}
}()
}
}
type schedulerConfig struct {
schedulerConfigPath string
featureGates map[featuregate.Feature]bool
}
func (c schedulerConfig) equals(tc *testCase) bool {
return c.schedulerConfigPath == tc.SchedulerConfigPath &&
cmp.Equal(c.featureGates, tc.FeatureGates)
}
func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error) {
data, err := os.ReadFile(file)
if err != nil {
@ -753,16 +845,16 @@ func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error
return nil, fmt.Errorf("couldn't decode as KubeSchedulerConfiguration, got %s: ", gvk)
}
func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op {
func unrollWorkloadTemplate(tb testing.TB, wt []op, w *workload) []op {
var unrolled []op
for opIndex, o := range wt {
realOp, err := o.realOp.patchParams(w)
if err != nil {
b.Fatalf("op %d: %v", opIndex, err)
tb.Fatalf("op %d: %v", opIndex, err)
}
switch concreteOp := realOp.(type) {
case *createPodSetsOp:
b.Logf("Creating %d pod sets %s", concreteOp.Count, concreteOp.CountParam)
tb.Logf("Creating %d pod sets %s", concreteOp.Count, concreteOp.CountParam)
for i := 0; i < concreteOp.Count; i++ {
copy := concreteOp.CreatePodsOp
ns := fmt.Sprintf("%s-%d", concreteOp.NamespacePrefix, i)
@ -776,28 +868,43 @@ func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op {
return unrolled
}
func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, cleanup bool) []DataItem {
start := time.Now()
b.Cleanup(func() {
duration := time.Now().Sub(start)
// This includes startup and shutdown time and thus does not
// reflect scheduling performance. It's useful to get a feeling
// for how long each workload runs overall.
b.ReportMetric(duration.Seconds(), "runtime_seconds")
})
func setupClusterForWorkload(ctx context.Context, tb testing.TB, configPath string, featureGates map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) {
var cfg *config.KubeSchedulerConfiguration
var err error
if tc.SchedulerConfigPath != nil {
cfg, err = loadSchedulerConfig(*tc.SchedulerConfigPath)
if configPath != "" {
cfg, err = loadSchedulerConfig(configPath)
if err != nil {
b.Fatalf("error loading scheduler config file: %v", err)
tb.Fatalf("error loading scheduler config file: %v", err)
}
if err = validation.ValidateKubeSchedulerConfiguration(cfg); err != nil {
b.Fatalf("validate scheduler config file failed: %v", err)
tb.Fatalf("validate scheduler config file failed: %v", err)
}
}
informerFactory, client, dynClient := mustSetupCluster(ctx, b, cfg, tc.FeatureGates)
return mustSetupCluster(ctx, tb, cfg, featureGates)
}
func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory, client clientset.Interface, dynClient dynamic.Interface, cleanup bool) []DataItem {
b, benchmarking := tb.(*testing.B)
if benchmarking {
start := time.Now()
b.Cleanup(func() {
duration := time.Since(start)
// This includes startup and shutdown time and thus does not
// reflect scheduling performance. It's useful to get a feeling
// for how long each workload runs overall.
b.ReportMetric(duration.Seconds(), "runtime_seconds")
})
}
// Disable error checking of the sampling interval length in the
// throughput collector by default. When running benchmarks, report
// it as test failure when samples are not taken regularly.
var throughputErrorMargin float64
if benchmarking {
// TODO: To prevent the perf-test failure, we increased the error margin, if still not enough
// one day, we should think of another approach to avoid this trick.
throughputErrorMargin = 30
}
// Additional informers needed for testing. The pod informer was
// already created before (scheduler.NewInformerFactory) and the
@ -820,45 +927,45 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c
if cleanup {
// This must run before controllers get shut down.
defer cleanupWorkload(ctx, b, tc, client, numPodsScheduledPerNamespace)
defer cleanupWorkload(ctx, tb, tc, client, numPodsScheduledPerNamespace)
}
for opIndex, op := range unrollWorkloadTemplate(b, tc.WorkloadTemplate, w) {
for opIndex, op := range unrollWorkloadTemplate(tb, tc.WorkloadTemplate, w) {
realOp, err := op.realOp.patchParams(w)
if err != nil {
b.Fatalf("op %d: %v", opIndex, err)
tb.Fatalf("op %d: %v", opIndex, err)
}
select {
case <-ctx.Done():
b.Fatalf("op %d: %v", opIndex, ctx.Err())
tb.Fatalf("op %d: %v", opIndex, ctx.Err())
default:
}
switch concreteOp := realOp.(type) {
case *createNodesOp:
nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, client)
if err != nil {
b.Fatalf("op %d: %v", opIndex, err)
tb.Fatalf("op %d: %v", opIndex, err)
}
if err := nodePreparer.PrepareNodes(ctx, nextNodeIndex); err != nil {
b.Fatalf("op %d: %v", opIndex, err)
tb.Fatalf("op %d: %v", opIndex, err)
}
if cleanup {
b.Cleanup(func() {
defer func() {
if err := nodePreparer.CleanupNodes(ctx); err != nil {
b.Fatalf("failed to clean up nodes, error: %v", err)
tb.Fatalf("failed to clean up nodes, error: %v", err)
}
})
}()
}
nextNodeIndex += concreteOp.Count
case *createNamespacesOp:
nsPreparer, err := newNamespacePreparer(concreteOp, client, b)
nsPreparer, err := newNamespacePreparer(concreteOp, client, tb)
if err != nil {
b.Fatalf("op %d: %v", opIndex, err)
tb.Fatalf("op %d: %v", opIndex, err)
}
if err := nsPreparer.prepare(ctx); err != nil {
nsPreparer.cleanup(ctx)
b.Fatalf("op %d: %v", opIndex, err)
tb.Fatalf("op %d: %v", opIndex, err)
}
for _, n := range nsPreparer.namespaces() {
if _, ok := numPodsScheduledPerNamespace[n]; ok {
@ -875,7 +982,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c
if concreteOp.Namespace != nil {
namespace = *concreteOp.Namespace
}
createNamespaceIfNotPresent(ctx, b, client, namespace, &numPodsScheduledPerNamespace)
createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace)
if concreteOp.PodTemplatePath == nil {
concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath
}
@ -891,7 +998,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c
if concreteOp.CollectMetrics {
collectorCtx, collectorCancel = context.WithCancel(ctx)
defer collectorCancel()
collectors = getTestDataCollectors(b, podInformer, fmt.Sprintf("%s/%s", b.Name(), namespace), namespace, tc.MetricsCollectorConfig)
collectors = getTestDataCollectors(tb, podInformer, fmt.Sprintf("%s/%s", tb.Name(), namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin)
for _, collector := range collectors {
// Need loop-local variable for function below.
collector := collector
@ -902,8 +1009,8 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c
}()
}
}
if err := createPods(ctx, b, namespace, concreteOp, client); err != nil {
b.Fatalf("op %d: %v", opIndex, err)
if err := createPods(ctx, tb, namespace, concreteOp, client); err != nil {
tb.Fatalf("op %d: %v", opIndex, err)
}
if concreteOp.SkipWaitToCompletion {
// Only record those namespaces that may potentially require barriers
@ -914,8 +1021,8 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c
numPodsScheduledPerNamespace[namespace] = concreteOp.Count
}
} else {
if err := waitUntilPodsScheduledInNamespace(ctx, b, podInformer, namespace, concreteOp.Count); err != nil {
b.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, concreteOp.Count); err != nil {
tb.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
}
}
if concreteOp.CollectMetrics {
@ -949,7 +1056,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c
// Ensure the namespace exists.
nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
if _, err := client.CoreV1().Namespaces().Create(ctx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
b.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
tb.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
}
var churnFns []func(name string) string
@ -957,12 +1064,12 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c
for i, path := range concreteOp.TemplatePaths {
unstructuredObj, gvk, err := getUnstructuredFromFile(path)
if err != nil {
b.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
tb.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
}
// Obtain GVR.
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
b.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
tb.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
}
gvr := mapping.Resource
// Distinguish cluster-scoped with namespaced API objects.
@ -1043,11 +1150,11 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c
case *barrierOp:
for _, namespace := range concreteOp.Namespaces {
if _, ok := numPodsScheduledPerNamespace[namespace]; !ok {
b.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
tb.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
}
}
if err := waitUntilPodsScheduled(ctx, b, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
b.Fatalf("op %d: %v", opIndex, err)
if err := waitUntilPodsScheduled(ctx, tb, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
tb.Fatalf("op %d: %v", opIndex, err)
}
// At the end of the barrier, we can be sure that there are no pods
// pending scheduling in the namespaces that we just blocked on.
@ -1067,19 +1174,19 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c
default:
runable, ok := concreteOp.(runnableOp)
if !ok {
b.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
tb.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
}
for _, namespace := range runable.requiredNamespaces() {
createNamespaceIfNotPresent(ctx, b, client, namespace, &numPodsScheduledPerNamespace)
createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace)
}
runable.run(ctx, b, client)
runable.run(ctx, tb, client)
}
}
// check unused params and inform users
unusedParams := w.unusedParams()
if len(unusedParams) != 0 {
b.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name)
tb.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name)
}
// Some tests have unschedulable pods. Do not add an implicit barrier at the
@ -1151,13 +1258,13 @@ func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client cl
}).WithTimeout(5*time.Minute).Should(gomega.BeEmpty(), "deleting namespaces")
}
func createNamespaceIfNotPresent(ctx context.Context, b *testing.B, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) {
func createNamespaceIfNotPresent(ctx context.Context, tb testing.TB, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) {
if _, ok := (*podsPerNamespace)[namespace]; !ok {
// The namespace has not created yet.
// So, create that and register it.
_, err := client.CoreV1().Namespaces().Create(ctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
if err != nil {
b.Fatalf("failed to create namespace for Pod: %v", namespace)
tb.Fatalf("failed to create namespace for Pod: %v", namespace)
}
(*podsPerNamespace)[namespace] = 0
}
@ -1168,12 +1275,12 @@ type testDataCollector interface {
collect() []DataItem
}
func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig) []testDataCollector {
func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector {
if mcc == nil {
mcc = &defaultMetricsCollectorConfig
}
return []testDataCollector{
newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}),
newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin),
newMetricsCollector(mcc, map[string]string{"Name": name}),
}
}
@ -1206,12 +1313,12 @@ func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Inte
), nil
}
func createPods(ctx context.Context, b *testing.B, namespace string, cpo *createPodsOp, clientset clientset.Interface) error {
func createPods(ctx context.Context, tb testing.TB, namespace string, cpo *createPodsOp, clientset clientset.Interface) error {
strategy, err := getPodStrategy(cpo)
if err != nil {
return err
}
b.Logf("creating %d pods in namespace %q", cpo.Count, namespace)
tb.Logf("creating %d pods in namespace %q", cpo.Count, namespace)
config := testutils.NewTestPodCreatorConfig()
config.AddStrategy(namespace, cpo.Count, strategy)
podCreator := testutils.NewTestPodCreator(clientset, config)
@ -1221,7 +1328,7 @@ func createPods(ctx context.Context, b *testing.B, namespace string, cpo *create
// waitUntilPodsScheduledInNamespace blocks until all pods in the given
// namespace are scheduled. Times out after 10 minutes because even at the
// lowest observed QPS of ~10 pods/sec, a 5000-node test should complete.
func waitUntilPodsScheduledInNamespace(ctx context.Context, b *testing.B, podInformer coreinformers.PodInformer, namespace string, wantCount int) error {
func waitUntilPodsScheduledInNamespace(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespace string, wantCount int) error {
return wait.PollImmediate(1*time.Second, 10*time.Minute, func() (bool, error) {
select {
case <-ctx.Done():
@ -1233,17 +1340,17 @@ func waitUntilPodsScheduledInNamespace(ctx context.Context, b *testing.B, podInf
return false, err
}
if len(scheduled) >= wantCount {
b.Logf("scheduling succeed")
tb.Logf("scheduling succeed")
return true, nil
}
b.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled))
tb.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled))
return false, nil
})
}
// waitUntilPodsScheduled blocks until the all pods in the given namespaces are
// scheduled.
func waitUntilPodsScheduled(ctx context.Context, b *testing.B, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error {
func waitUntilPodsScheduled(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error {
// If unspecified, default to all known namespaces.
if len(namespaces) == 0 {
for namespace := range numPodsScheduledPerNamespace {
@ -1260,7 +1367,7 @@ func waitUntilPodsScheduled(ctx context.Context, b *testing.B, podInformer corei
if !ok {
return fmt.Errorf("unknown namespace %s", namespace)
}
if err := waitUntilPodsScheduledInNamespace(ctx, b, podInformer, namespace, wantCount); err != nil {
if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, wantCount); err != nil {
return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err)
}
}
@ -1414,10 +1521,10 @@ type namespacePreparer struct {
count int
prefix string
spec *v1.Namespace
t testing.TB
tb testing.TB
}
func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface, b *testing.B) (*namespacePreparer, error) {
func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface, tb testing.TB) (*namespacePreparer, error) {
ns := &v1.Namespace{}
if cno.NamespaceTemplatePath != nil {
if err := getSpecFromFile(cno.NamespaceTemplatePath, ns); err != nil {
@ -1430,7 +1537,7 @@ func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface
count: cno.Count,
prefix: cno.Prefix,
spec: ns,
t: b,
tb: tb,
}, nil
}
@ -1449,7 +1556,7 @@ func (p *namespacePreparer) prepare(ctx context.Context) error {
if p.spec != nil {
base = p.spec
}
p.t.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base)
p.tb.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base)
for i := 0; i < p.count; i++ {
n := base.DeepCopy()
n.Name = fmt.Sprintf("%s-%d", p.prefix, i)
@ -1469,7 +1576,7 @@ func (p *namespacePreparer) cleanup(ctx context.Context) error {
for i := 0; i < p.count; i++ {
n := fmt.Sprintf("%s-%d", p.prefix, i)
if err := p.client.CoreV1().Namespaces().Delete(ctx, n, metav1.DeleteOptions{}); err != nil {
p.t.Errorf("Deleting Namespace: %v", err)
p.tb.Errorf("Deleting Namespace: %v", err)
errRet = err
}
}

View File

@ -73,7 +73,7 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) {
return &cfg, nil
}
// mustSetupScheduler starts the following components:
// mustSetupCluster starts the following components:
// - k8s api server
// - scheduler
// - some of the kube-controller-manager controllers
@ -82,11 +82,11 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) {
// remove resources after finished.
// Notes on rate limiter:
// - client rate limit is set to 5000.
func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) {
func mustSetupCluster(ctx context.Context, tb testing.TB, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) {
// Run API server with minimimal logging by default. Can be raised with -v.
framework.MinVerbosity = 0
_, kubeConfig, tearDownFn := framework.StartTestServer(ctx, b, framework.TestServerSetup{
_, kubeConfig, tearDownFn := framework.StartTestServer(ctx, tb, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"}
@ -99,12 +99,12 @@ func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSche
}
},
})
b.Cleanup(tearDownFn)
tb.Cleanup(tearDownFn)
// Cleanup will be in reverse order: first the clients get cancelled,
// then the apiserver is torn down.
ctx, cancel := context.WithCancel(ctx)
b.Cleanup(cancel)
tb.Cleanup(cancel)
// TODO: client connection configuration, such as QPS or Burst is configurable in theory, this could be derived from the `config`, need to
// support this when there is any testcase that depends on such configuration.
@ -117,7 +117,7 @@ func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSche
var err error
config, err = newDefaultComponentConfig()
if err != nil {
b.Fatalf("Error creating default component config: %v", err)
tb.Fatalf("Error creating default component config: %v", err)
}
}
@ -128,14 +128,14 @@ func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSche
// be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`.
_, informerFactory := util.StartScheduler(ctx, client, cfg, config)
util.StartFakePVController(ctx, client, informerFactory)
runGC := util.CreateGCController(ctx, b, *cfg, informerFactory)
runNS := util.CreateNamespaceController(ctx, b, *cfg, informerFactory)
runGC := util.CreateGCController(ctx, tb, *cfg, informerFactory)
runNS := util.CreateNamespaceController(ctx, tb, *cfg, informerFactory)
runResourceClaimController := func() {}
if enabledFeatures[features.DynamicResourceAllocation] {
// Testing of DRA with inline resource claims depends on this
// controller for creating and removing ResourceClaims.
runResourceClaimController = util.CreateResourceClaimController(ctx, b, client, informerFactory)
runResourceClaimController = util.CreateResourceClaimController(ctx, tb, client, informerFactory)
}
informerFactory.Start(ctx.Done())
@ -320,14 +320,16 @@ type throughputCollector struct {
schedulingThroughputs []float64
labels map[string]string
namespaces []string
errorMargin float64
}
func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string) *throughputCollector {
func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector {
return &throughputCollector{
tb: tb,
podInformer: podInformer,
labels: labels,
namespaces: namespaces,
errorMargin: errorMargin,
}
}
@ -388,9 +390,7 @@ func (tc *throughputCollector) run(ctx context.Context) {
throughput := float64(newScheduled) / durationInSeconds
expectedDuration := throughputSampleInterval * time.Duration(skipped+1)
errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100
// TODO: To prevent the perf-test failure, we increased the error margin, if still not enough
// one day, we should think of another approach to avoid this trick.
if math.Abs(errorMargin) > 30 {
if tc.errorMargin > 0 && math.Abs(errorMargin) > tc.errorMargin {
// This might affect the result, report it.
tc.tb.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin)
}