diff --git a/test/integration/scheduler_perf/README.md b/test/integration/scheduler_perf/README.md index 91cf677e4ad..618e48a3be4 100644 --- a/test/integration/scheduler_perf/README.md +++ b/test/integration/scheduler_perf/README.md @@ -100,3 +100,15 @@ performance. During interactive debugging sessions it is possible to enable per-test output via -use-testing-log. + +## Integration tests + +To run integration tests, use: +``` +make test-integration WHAT=./test/integration/scheduler_perf KUBE_TEST_ARGS=-use-testing-log +``` + +Integration testing uses the same `config/performance-config.yaml` as +benchmarking. By default, workloads labeled as `integration-test` are executed +as part of integration testing. `-test-scheduling-label-filter` can be used to +change that. diff --git a/test/integration/scheduler_perf/config/performance-config.yaml b/test/integration/scheduler_perf/config/performance-config.yaml index 1a5e82f6826..6d2d0e4ac2e 100644 --- a/test/integration/scheduler_perf/config/performance-config.yaml +++ b/test/integration/scheduler_perf/config/performance-config.yaml @@ -1,3 +1,17 @@ +# The following labels are used in this file: +# - fast: short execution time, ideally less than 30 seconds +# - integration-test: used to select workloads that +# run in pull-kubernetes-integration. Choosing those tests +# is a tradeoff between code coverage and overall runtime. +# - performance: used to select workloads that run +# in ci-benchmark-scheduler-perf. Such workloads +# must run long enough (ideally, longer than 10 seconds) +# to provide meaningful samples for the pod scheduling +# rate. +# +# Combining "performance" and "fast" selects suitable workloads for a local +# before/after comparisons with benchstat. + - name: SchedulingBasic defaultPodTemplatePath: config/pod-default.yaml workloadTemplate: @@ -10,7 +24,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 500 @@ -39,7 +53,7 @@ namespace: sched-1 workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 100 @@ -161,7 +175,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 500 @@ -223,7 +237,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 500 @@ -308,7 +322,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 1000 @@ -386,7 +400,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 200 @@ -504,7 +518,7 @@ collectMetrics: true workloads: - name: 1000Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 1000 measurePods: 1000 @@ -734,6 +748,7 @@ collectMetrics: true workloads: - name: fast + labels: [integration-test, fast] params: # This testcase runs through all code paths without # taking too long overall. @@ -743,7 +758,7 @@ measurePods: 10 maxClaimsPerNode: 10 - name: 2000pods_100nodes - labels: [performance,fast] + labels: [performance, fast] params: # In this testcase, the number of nodes is smaller # than the limit for the PodScheduling slices. diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index bd02232eeee..619d46bc39a 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -30,6 +30,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" @@ -43,6 +44,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" + "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" @@ -128,7 +130,7 @@ type testCase struct { Workloads []*workload // SchedulerConfigPath is the path of scheduler configuration // Optional - SchedulerConfigPath *string + SchedulerConfigPath string // Default path to spec file describing the pods to create. // This path can be overridden in createPodsOp by setting PodTemplatePath . // Optional @@ -640,7 +642,7 @@ func initTestOutput(tb testing.TB) io.Writer { return output } -var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-)") +var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by BenchmarkPerfScheduling") func BenchmarkPerfScheduling(b *testing.B) { testCases, err := getTestCases(configFile) @@ -699,7 +701,8 @@ func BenchmarkPerfScheduling(b *testing.B) { for feature, flag := range tc.FeatureGates { defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)() } - results := runWorkload(ctx, b, tc, w, false) + informerFactory, client, dyncClient := setupClusterForWorkload(ctx, b, tc.SchedulerConfigPath, tc.FeatureGates) + results := runWorkload(ctx, b, tc, w, informerFactory, client, dyncClient, false) dataItems.DataItems = append(dataItems.DataItems, results...) if len(results) > 0 { @@ -737,6 +740,95 @@ func BenchmarkPerfScheduling(b *testing.B) { } } +var testSchedulingLabelFilter = flag.String("test-scheduling-label-filter", "integration-test", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by TestScheduling") + +func TestScheduling(t *testing.T) { + testCases, err := getTestCases(configFile) + if err != nil { + t.Fatal(err) + } + if err = validateTestCases(testCases); err != nil { + t.Fatal(err) + } + + // Check for leaks at the very end. + framework.GoleakCheck(t) + + // All integration test cases share the same etcd, similar to + // https://github.com/kubernetes/kubernetes/blob/18d05b646d09b2971dc5400bc288062b0414e8cf/test/integration/framework/etcd.go#L186-L222. + framework.StartEtcd(t, nil) + + // Workloads with the same configuration share the same apiserver. For that + // we first need to determine what those different configs are. + var configs []schedulerConfig + for _, tc := range testCases { + tcEnabled := false + for _, w := range tc.Workloads { + if enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) { + tcEnabled = true + break + } + } + if !tcEnabled { + continue + } + exists := false + for _, config := range configs { + if config.equals(tc) { + exists = true + break + } + } + if !exists { + configs = append(configs, schedulerConfig{schedulerConfigPath: tc.SchedulerConfigPath, featureGates: tc.FeatureGates}) + } + } + for _, config := range configs { + // Not a sub test because we don't have a good name for it. + func() { + _, ctx := ktesting.NewTestContext(t) + // No timeout here because the `go test -timeout` will ensure that + // the test doesn't get stuck forever. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + for feature, flag := range config.featureGates { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, flag)() + } + informerFactory, client, dynClient := setupClusterForWorkload(ctx, t, config.schedulerConfigPath, config.featureGates) + + for _, tc := range testCases { + if !config.equals(tc) { + // Runs with some other config. + continue + } + + t.Run(tc.Name, func(t *testing.T) { + for _, w := range tc.Workloads { + t.Run(w.Name, func(t *testing.T) { + if !enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) { + t.Skipf("disabled by label filter %q", *testSchedulingLabelFilter) + } + _, ctx := ktesting.NewTestContext(t) + runWorkload(ctx, t, tc, w, informerFactory, client, dynClient, true) + }) + } + }) + } + }() + } +} + +type schedulerConfig struct { + schedulerConfigPath string + featureGates map[featuregate.Feature]bool +} + +func (c schedulerConfig) equals(tc *testCase) bool { + return c.schedulerConfigPath == tc.SchedulerConfigPath && + cmp.Equal(c.featureGates, tc.FeatureGates) +} + func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error) { data, err := os.ReadFile(file) if err != nil { @@ -753,16 +845,16 @@ func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error return nil, fmt.Errorf("couldn't decode as KubeSchedulerConfiguration, got %s: ", gvk) } -func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op { +func unrollWorkloadTemplate(tb testing.TB, wt []op, w *workload) []op { var unrolled []op for opIndex, o := range wt { realOp, err := o.realOp.patchParams(w) if err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } switch concreteOp := realOp.(type) { case *createPodSetsOp: - b.Logf("Creating %d pod sets %s", concreteOp.Count, concreteOp.CountParam) + tb.Logf("Creating %d pod sets %s", concreteOp.Count, concreteOp.CountParam) for i := 0; i < concreteOp.Count; i++ { copy := concreteOp.CreatePodsOp ns := fmt.Sprintf("%s-%d", concreteOp.NamespacePrefix, i) @@ -776,28 +868,43 @@ func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op { return unrolled } -func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, cleanup bool) []DataItem { - start := time.Now() - b.Cleanup(func() { - duration := time.Now().Sub(start) - // This includes startup and shutdown time and thus does not - // reflect scheduling performance. It's useful to get a feeling - // for how long each workload runs overall. - b.ReportMetric(duration.Seconds(), "runtime_seconds") - }) - +func setupClusterForWorkload(ctx context.Context, tb testing.TB, configPath string, featureGates map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { var cfg *config.KubeSchedulerConfiguration var err error - if tc.SchedulerConfigPath != nil { - cfg, err = loadSchedulerConfig(*tc.SchedulerConfigPath) + if configPath != "" { + cfg, err = loadSchedulerConfig(configPath) if err != nil { - b.Fatalf("error loading scheduler config file: %v", err) + tb.Fatalf("error loading scheduler config file: %v", err) } if err = validation.ValidateKubeSchedulerConfiguration(cfg); err != nil { - b.Fatalf("validate scheduler config file failed: %v", err) + tb.Fatalf("validate scheduler config file failed: %v", err) } } - informerFactory, client, dynClient := mustSetupCluster(ctx, b, cfg, tc.FeatureGates) + return mustSetupCluster(ctx, tb, cfg, featureGates) +} + +func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory, client clientset.Interface, dynClient dynamic.Interface, cleanup bool) []DataItem { + b, benchmarking := tb.(*testing.B) + if benchmarking { + start := time.Now() + b.Cleanup(func() { + duration := time.Since(start) + // This includes startup and shutdown time and thus does not + // reflect scheduling performance. It's useful to get a feeling + // for how long each workload runs overall. + b.ReportMetric(duration.Seconds(), "runtime_seconds") + }) + } + + // Disable error checking of the sampling interval length in the + // throughput collector by default. When running benchmarks, report + // it as test failure when samples are not taken regularly. + var throughputErrorMargin float64 + if benchmarking { + // TODO: To prevent the perf-test failure, we increased the error margin, if still not enough + // one day, we should think of another approach to avoid this trick. + throughputErrorMargin = 30 + } // Additional informers needed for testing. The pod informer was // already created before (scheduler.NewInformerFactory) and the @@ -820,45 +927,45 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c if cleanup { // This must run before controllers get shut down. - defer cleanupWorkload(ctx, b, tc, client, numPodsScheduledPerNamespace) + defer cleanupWorkload(ctx, tb, tc, client, numPodsScheduledPerNamespace) } - for opIndex, op := range unrollWorkloadTemplate(b, tc.WorkloadTemplate, w) { + for opIndex, op := range unrollWorkloadTemplate(tb, tc.WorkloadTemplate, w) { realOp, err := op.realOp.patchParams(w) if err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } select { case <-ctx.Done(): - b.Fatalf("op %d: %v", opIndex, ctx.Err()) + tb.Fatalf("op %d: %v", opIndex, ctx.Err()) default: } switch concreteOp := realOp.(type) { case *createNodesOp: nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, client) if err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } if err := nodePreparer.PrepareNodes(ctx, nextNodeIndex); err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } if cleanup { - b.Cleanup(func() { + defer func() { if err := nodePreparer.CleanupNodes(ctx); err != nil { - b.Fatalf("failed to clean up nodes, error: %v", err) + tb.Fatalf("failed to clean up nodes, error: %v", err) } - }) + }() } nextNodeIndex += concreteOp.Count case *createNamespacesOp: - nsPreparer, err := newNamespacePreparer(concreteOp, client, b) + nsPreparer, err := newNamespacePreparer(concreteOp, client, tb) if err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } if err := nsPreparer.prepare(ctx); err != nil { nsPreparer.cleanup(ctx) - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } for _, n := range nsPreparer.namespaces() { if _, ok := numPodsScheduledPerNamespace[n]; ok { @@ -875,7 +982,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c if concreteOp.Namespace != nil { namespace = *concreteOp.Namespace } - createNamespaceIfNotPresent(ctx, b, client, namespace, &numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace) if concreteOp.PodTemplatePath == nil { concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath } @@ -891,7 +998,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c if concreteOp.CollectMetrics { collectorCtx, collectorCancel = context.WithCancel(ctx) defer collectorCancel() - collectors = getTestDataCollectors(b, podInformer, fmt.Sprintf("%s/%s", b.Name(), namespace), namespace, tc.MetricsCollectorConfig) + collectors = getTestDataCollectors(tb, podInformer, fmt.Sprintf("%s/%s", tb.Name(), namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin) for _, collector := range collectors { // Need loop-local variable for function below. collector := collector @@ -902,8 +1009,8 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c }() } } - if err := createPods(ctx, b, namespace, concreteOp, client); err != nil { - b.Fatalf("op %d: %v", opIndex, err) + if err := createPods(ctx, tb, namespace, concreteOp, client); err != nil { + tb.Fatalf("op %d: %v", opIndex, err) } if concreteOp.SkipWaitToCompletion { // Only record those namespaces that may potentially require barriers @@ -914,8 +1021,8 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c numPodsScheduledPerNamespace[namespace] = concreteOp.Count } } else { - if err := waitUntilPodsScheduledInNamespace(ctx, b, podInformer, namespace, concreteOp.Count); err != nil { - b.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) + if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, concreteOp.Count); err != nil { + tb.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) } } if concreteOp.CollectMetrics { @@ -949,7 +1056,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c // Ensure the namespace exists. nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} if _, err := client.CoreV1().Namespaces().Create(ctx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { - b.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) + tb.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) } var churnFns []func(name string) string @@ -957,12 +1064,12 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c for i, path := range concreteOp.TemplatePaths { unstructuredObj, gvk, err := getUnstructuredFromFile(path) if err != nil { - b.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) + tb.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) } // Obtain GVR. mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { - b.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) + tb.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) } gvr := mapping.Resource // Distinguish cluster-scoped with namespaced API objects. @@ -1043,11 +1150,11 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c case *barrierOp: for _, namespace := range concreteOp.Namespaces { if _, ok := numPodsScheduledPerNamespace[namespace]; !ok { - b.Fatalf("op %d: unknown namespace %s", opIndex, namespace) + tb.Fatalf("op %d: unknown namespace %s", opIndex, namespace) } } - if err := waitUntilPodsScheduled(ctx, b, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { - b.Fatalf("op %d: %v", opIndex, err) + if err := waitUntilPodsScheduled(ctx, tb, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { + tb.Fatalf("op %d: %v", opIndex, err) } // At the end of the barrier, we can be sure that there are no pods // pending scheduling in the namespaces that we just blocked on. @@ -1067,19 +1174,19 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c default: runable, ok := concreteOp.(runnableOp) if !ok { - b.Fatalf("op %d: invalid op %v", opIndex, concreteOp) + tb.Fatalf("op %d: invalid op %v", opIndex, concreteOp) } for _, namespace := range runable.requiredNamespaces() { - createNamespaceIfNotPresent(ctx, b, client, namespace, &numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace) } - runable.run(ctx, b, client) + runable.run(ctx, tb, client) } } // check unused params and inform users unusedParams := w.unusedParams() if len(unusedParams) != 0 { - b.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name) + tb.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name) } // Some tests have unschedulable pods. Do not add an implicit barrier at the @@ -1151,13 +1258,13 @@ func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client cl }).WithTimeout(5*time.Minute).Should(gomega.BeEmpty(), "deleting namespaces") } -func createNamespaceIfNotPresent(ctx context.Context, b *testing.B, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) { +func createNamespaceIfNotPresent(ctx context.Context, tb testing.TB, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) { if _, ok := (*podsPerNamespace)[namespace]; !ok { // The namespace has not created yet. // So, create that and register it. _, err := client.CoreV1().Namespaces().Create(ctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{}) if err != nil { - b.Fatalf("failed to create namespace for Pod: %v", namespace) + tb.Fatalf("failed to create namespace for Pod: %v", namespace) } (*podsPerNamespace)[namespace] = 0 } @@ -1168,12 +1275,12 @@ type testDataCollector interface { collect() []DataItem } -func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig) []testDataCollector { +func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector { if mcc == nil { mcc = &defaultMetricsCollectorConfig } return []testDataCollector{ - newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}), + newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin), newMetricsCollector(mcc, map[string]string{"Name": name}), } } @@ -1206,12 +1313,12 @@ func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Inte ), nil } -func createPods(ctx context.Context, b *testing.B, namespace string, cpo *createPodsOp, clientset clientset.Interface) error { +func createPods(ctx context.Context, tb testing.TB, namespace string, cpo *createPodsOp, clientset clientset.Interface) error { strategy, err := getPodStrategy(cpo) if err != nil { return err } - b.Logf("creating %d pods in namespace %q", cpo.Count, namespace) + tb.Logf("creating %d pods in namespace %q", cpo.Count, namespace) config := testutils.NewTestPodCreatorConfig() config.AddStrategy(namespace, cpo.Count, strategy) podCreator := testutils.NewTestPodCreator(clientset, config) @@ -1221,7 +1328,7 @@ func createPods(ctx context.Context, b *testing.B, namespace string, cpo *create // waitUntilPodsScheduledInNamespace blocks until all pods in the given // namespace are scheduled. Times out after 10 minutes because even at the // lowest observed QPS of ~10 pods/sec, a 5000-node test should complete. -func waitUntilPodsScheduledInNamespace(ctx context.Context, b *testing.B, podInformer coreinformers.PodInformer, namespace string, wantCount int) error { +func waitUntilPodsScheduledInNamespace(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespace string, wantCount int) error { return wait.PollImmediate(1*time.Second, 10*time.Minute, func() (bool, error) { select { case <-ctx.Done(): @@ -1233,17 +1340,17 @@ func waitUntilPodsScheduledInNamespace(ctx context.Context, b *testing.B, podInf return false, err } if len(scheduled) >= wantCount { - b.Logf("scheduling succeed") + tb.Logf("scheduling succeed") return true, nil } - b.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled)) + tb.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled)) return false, nil }) } // waitUntilPodsScheduled blocks until the all pods in the given namespaces are // scheduled. -func waitUntilPodsScheduled(ctx context.Context, b *testing.B, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { +func waitUntilPodsScheduled(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { // If unspecified, default to all known namespaces. if len(namespaces) == 0 { for namespace := range numPodsScheduledPerNamespace { @@ -1260,7 +1367,7 @@ func waitUntilPodsScheduled(ctx context.Context, b *testing.B, podInformer corei if !ok { return fmt.Errorf("unknown namespace %s", namespace) } - if err := waitUntilPodsScheduledInNamespace(ctx, b, podInformer, namespace, wantCount); err != nil { + if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, wantCount); err != nil { return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err) } } @@ -1414,10 +1521,10 @@ type namespacePreparer struct { count int prefix string spec *v1.Namespace - t testing.TB + tb testing.TB } -func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface, b *testing.B) (*namespacePreparer, error) { +func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface, tb testing.TB) (*namespacePreparer, error) { ns := &v1.Namespace{} if cno.NamespaceTemplatePath != nil { if err := getSpecFromFile(cno.NamespaceTemplatePath, ns); err != nil { @@ -1430,7 +1537,7 @@ func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface count: cno.Count, prefix: cno.Prefix, spec: ns, - t: b, + tb: tb, }, nil } @@ -1449,7 +1556,7 @@ func (p *namespacePreparer) prepare(ctx context.Context) error { if p.spec != nil { base = p.spec } - p.t.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base) + p.tb.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base) for i := 0; i < p.count; i++ { n := base.DeepCopy() n.Name = fmt.Sprintf("%s-%d", p.prefix, i) @@ -1469,7 +1576,7 @@ func (p *namespacePreparer) cleanup(ctx context.Context) error { for i := 0; i < p.count; i++ { n := fmt.Sprintf("%s-%d", p.prefix, i) if err := p.client.CoreV1().Namespaces().Delete(ctx, n, metav1.DeleteOptions{}); err != nil { - p.t.Errorf("Deleting Namespace: %v", err) + p.tb.Errorf("Deleting Namespace: %v", err) errRet = err } } diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 91d6edc78ad..79bc3ab10bb 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -73,7 +73,7 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { return &cfg, nil } -// mustSetupScheduler starts the following components: +// mustSetupCluster starts the following components: // - k8s api server // - scheduler // - some of the kube-controller-manager controllers @@ -82,11 +82,11 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { +func mustSetupCluster(ctx context.Context, tb testing.TB, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { // Run API server with minimimal logging by default. Can be raised with -v. framework.MinVerbosity = 0 - _, kubeConfig, tearDownFn := framework.StartTestServer(ctx, b, framework.TestServerSetup{ + _, kubeConfig, tearDownFn := framework.StartTestServer(ctx, tb, framework.TestServerSetup{ ModifyServerRunOptions: func(opts *options.ServerRunOptions) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"} @@ -99,12 +99,12 @@ func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSche } }, }) - b.Cleanup(tearDownFn) + tb.Cleanup(tearDownFn) // Cleanup will be in reverse order: first the clients get cancelled, // then the apiserver is torn down. ctx, cancel := context.WithCancel(ctx) - b.Cleanup(cancel) + tb.Cleanup(cancel) // TODO: client connection configuration, such as QPS or Burst is configurable in theory, this could be derived from the `config`, need to // support this when there is any testcase that depends on such configuration. @@ -117,7 +117,7 @@ func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSche var err error config, err = newDefaultComponentConfig() if err != nil { - b.Fatalf("Error creating default component config: %v", err) + tb.Fatalf("Error creating default component config: %v", err) } } @@ -128,14 +128,14 @@ func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSche // be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`. _, informerFactory := util.StartScheduler(ctx, client, cfg, config) util.StartFakePVController(ctx, client, informerFactory) - runGC := util.CreateGCController(ctx, b, *cfg, informerFactory) - runNS := util.CreateNamespaceController(ctx, b, *cfg, informerFactory) + runGC := util.CreateGCController(ctx, tb, *cfg, informerFactory) + runNS := util.CreateNamespaceController(ctx, tb, *cfg, informerFactory) runResourceClaimController := func() {} if enabledFeatures[features.DynamicResourceAllocation] { // Testing of DRA with inline resource claims depends on this // controller for creating and removing ResourceClaims. - runResourceClaimController = util.CreateResourceClaimController(ctx, b, client, informerFactory) + runResourceClaimController = util.CreateResourceClaimController(ctx, tb, client, informerFactory) } informerFactory.Start(ctx.Done()) @@ -320,14 +320,16 @@ type throughputCollector struct { schedulingThroughputs []float64 labels map[string]string namespaces []string + errorMargin float64 } -func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string) *throughputCollector { +func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector { return &throughputCollector{ tb: tb, podInformer: podInformer, labels: labels, namespaces: namespaces, + errorMargin: errorMargin, } } @@ -388,9 +390,7 @@ func (tc *throughputCollector) run(ctx context.Context) { throughput := float64(newScheduled) / durationInSeconds expectedDuration := throughputSampleInterval * time.Duration(skipped+1) errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100 - // TODO: To prevent the perf-test failure, we increased the error margin, if still not enough - // one day, we should think of another approach to avoid this trick. - if math.Abs(errorMargin) > 30 { + if tc.errorMargin > 0 && math.Abs(errorMargin) > tc.errorMargin { // This might affect the result, report it. tc.tb.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin) }