diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index f1e8435e09b..215c369a082 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -41,27 +41,22 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/util/feature" - cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1" - "k8s.io/client-go/metadata" - "k8s.io/client-go/metadata/metadatainformer" restclient "k8s.io/client-go/rest" - "k8s.io/client-go/restmapper" "k8s.io/client-go/util/retry" featuregatetesting "k8s.io/component-base/featuregate/testing" basemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/testutil" - "k8s.io/controller-manager/pkg/informerfactory" "k8s.io/klog/v2" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/controller/garbagecollector" jobcontroller "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/kubernetes/test/integration/util" "k8s.io/utils/pointer" ) @@ -1313,7 +1308,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { defer cancel() restConfig.QPS = 200 restConfig.Burst = 200 - runGC := createGC(ctx, t, restConfig, informerSet) + runGC := util.CreateGCController(ctx, t, *restConfig, informerSet) informerSet.Start(ctx.Done()) go jc.Run(ctx, 1) runGC() @@ -2092,40 +2087,6 @@ func createJobControllerWithSharedInformers(restConfig *restclient.Config, infor return jc, ctx, cancel } -func createGC(ctx context.Context, t *testing.T, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) func() { - restConfig = restclient.AddUserAgent(restConfig, "gc-controller") - clientSet := clientset.NewForConfigOrDie(restConfig) - metadataClient, err := metadata.NewForConfig(restConfig) - if err != nil { - t.Fatalf("Failed to create metadataClient: %v", err) - } - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery())) - restMapper.Reset() - metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0) - alwaysStarted := make(chan struct{}) - close(alwaysStarted) - gc, err := garbagecollector.NewGarbageCollector( - clientSet, - metadataClient, - restMapper, - garbagecollector.DefaultIgnoredResources(), - informerfactory.NewInformerFactory(informerSet, metadataInformers), - alwaysStarted, - ) - if err != nil { - t.Fatalf("Failed creating garbage collector") - } - startGC := func() { - syncPeriod := 5 * time.Second - go wait.Until(func() { - restMapper.Reset() - }, syncPeriod, ctx.Done()) - go gc.Run(ctx, 1) - go gc.Sync(ctx, clientSet.Discovery(), syncPeriod) - } - return startGC -} - func hasJobTrackingFinalizer(obj metav1.Object) bool { for _, fin := range obj.GetFinalizers() { if fin == batchv1.JobTrackingFinalizer { diff --git a/test/integration/scheduler_perf/README.md b/test/integration/scheduler_perf/README.md index 91cf677e4ad..618e48a3be4 100644 --- a/test/integration/scheduler_perf/README.md +++ b/test/integration/scheduler_perf/README.md @@ -100,3 +100,15 @@ performance. During interactive debugging sessions it is possible to enable per-test output via -use-testing-log. + +## Integration tests + +To run integration tests, use: +``` +make test-integration WHAT=./test/integration/scheduler_perf KUBE_TEST_ARGS=-use-testing-log +``` + +Integration testing uses the same `config/performance-config.yaml` as +benchmarking. By default, workloads labeled as `integration-test` are executed +as part of integration testing. `-test-scheduling-label-filter` can be used to +change that. diff --git a/test/integration/scheduler_perf/config/performance-config.yaml b/test/integration/scheduler_perf/config/performance-config.yaml index 1a5e82f6826..6d2d0e4ac2e 100644 --- a/test/integration/scheduler_perf/config/performance-config.yaml +++ b/test/integration/scheduler_perf/config/performance-config.yaml @@ -1,3 +1,17 @@ +# The following labels are used in this file: +# - fast: short execution time, ideally less than 30 seconds +# - integration-test: used to select workloads that +# run in pull-kubernetes-integration. Choosing those tests +# is a tradeoff between code coverage and overall runtime. +# - performance: used to select workloads that run +# in ci-benchmark-scheduler-perf. Such workloads +# must run long enough (ideally, longer than 10 seconds) +# to provide meaningful samples for the pod scheduling +# rate. +# +# Combining "performance" and "fast" selects suitable workloads for a local +# before/after comparisons with benchstat. + - name: SchedulingBasic defaultPodTemplatePath: config/pod-default.yaml workloadTemplate: @@ -10,7 +24,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 500 @@ -39,7 +53,7 @@ namespace: sched-1 workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 100 @@ -161,7 +175,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 500 @@ -223,7 +237,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 500 @@ -308,7 +322,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 1000 @@ -386,7 +400,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 200 @@ -504,7 +518,7 @@ collectMetrics: true workloads: - name: 1000Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 1000 measurePods: 1000 @@ -734,6 +748,7 @@ collectMetrics: true workloads: - name: fast + labels: [integration-test, fast] params: # This testcase runs through all code paths without # taking too long overall. @@ -743,7 +758,7 @@ measurePods: 10 maxClaimsPerNode: 10 - name: 2000pods_100nodes - labels: [performance,fast] + labels: [performance, fast] params: # In this testcase, the number of nodes is smaller # than the limit for the PodScheduling slices. diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index e6543c66bbd..ecbfce6887b 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -30,6 +30,8 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -41,6 +43,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" + "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" @@ -125,7 +128,7 @@ type testCase struct { Workloads []*workload // SchedulerConfigPath is the path of scheduler configuration // Optional - SchedulerConfigPath *string + SchedulerConfigPath string // Default path to spec file describing the pods to create. // This path can be overridden in createPodsOp by setting PodTemplatePath . // Optional @@ -637,7 +640,7 @@ func initTestOutput(tb testing.TB) io.Writer { return output } -var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-)") +var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by BenchmarkPerfScheduling") func BenchmarkPerfScheduling(b *testing.B) { testCases, err := getTestCases(configFile) @@ -696,7 +699,8 @@ func BenchmarkPerfScheduling(b *testing.B) { for feature, flag := range tc.FeatureGates { defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)() } - results := runWorkload(ctx, b, tc, w) + informerFactory, client, dyncClient := setupClusterForWorkload(ctx, b, tc.SchedulerConfigPath, tc.FeatureGates) + results := runWorkload(ctx, b, tc, w, informerFactory, client, dyncClient, false) dataItems.DataItems = append(dataItems.DataItems, results...) if len(results) > 0 { @@ -734,6 +738,95 @@ func BenchmarkPerfScheduling(b *testing.B) { } } +var testSchedulingLabelFilter = flag.String("test-scheduling-label-filter", "integration-test", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by TestScheduling") + +func TestScheduling(t *testing.T) { + testCases, err := getTestCases(configFile) + if err != nil { + t.Fatal(err) + } + if err = validateTestCases(testCases); err != nil { + t.Fatal(err) + } + + // Check for leaks at the very end. + framework.GoleakCheck(t) + + // All integration test cases share the same etcd, similar to + // https://github.com/kubernetes/kubernetes/blob/18d05b646d09b2971dc5400bc288062b0414e8cf/test/integration/framework/etcd.go#L186-L222. + framework.StartEtcd(t, nil) + + // Workloads with the same configuration share the same apiserver. For that + // we first need to determine what those different configs are. + var configs []schedulerConfig + for _, tc := range testCases { + tcEnabled := false + for _, w := range tc.Workloads { + if enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) { + tcEnabled = true + break + } + } + if !tcEnabled { + continue + } + exists := false + for _, config := range configs { + if config.equals(tc) { + exists = true + break + } + } + if !exists { + configs = append(configs, schedulerConfig{schedulerConfigPath: tc.SchedulerConfigPath, featureGates: tc.FeatureGates}) + } + } + for _, config := range configs { + // Not a sub test because we don't have a good name for it. + func() { + _, ctx := ktesting.NewTestContext(t) + // No timeout here because the `go test -timeout` will ensure that + // the test doesn't get stuck forever. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + for feature, flag := range config.featureGates { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, flag)() + } + informerFactory, client, dynClient := setupClusterForWorkload(ctx, t, config.schedulerConfigPath, config.featureGates) + + for _, tc := range testCases { + if !config.equals(tc) { + // Runs with some other config. + continue + } + + t.Run(tc.Name, func(t *testing.T) { + for _, w := range tc.Workloads { + t.Run(w.Name, func(t *testing.T) { + if !enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) { + t.Skipf("disabled by label filter %q", *testSchedulingLabelFilter) + } + _, ctx := ktesting.NewTestContext(t) + runWorkload(ctx, t, tc, w, informerFactory, client, dynClient, true) + }) + } + }) + } + }() + } +} + +type schedulerConfig struct { + schedulerConfigPath string + featureGates map[featuregate.Feature]bool +} + +func (c schedulerConfig) equals(tc *testCase) bool { + return c.schedulerConfigPath == tc.SchedulerConfigPath && + cmp.Equal(c.featureGates, tc.FeatureGates) +} + func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error) { data, err := os.ReadFile(file) if err != nil { @@ -750,16 +843,16 @@ func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error return nil, fmt.Errorf("couldn't decode as KubeSchedulerConfiguration, got %s: ", gvk) } -func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op { +func unrollWorkloadTemplate(tb testing.TB, wt []op, w *workload) []op { var unrolled []op for opIndex, o := range wt { realOp, err := o.realOp.patchParams(w) if err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } switch concreteOp := realOp.(type) { case *createPodSetsOp: - b.Logf("Creating %d pod sets %s", concreteOp.Count, concreteOp.CountParam) + tb.Logf("Creating %d pod sets %s", concreteOp.Count, concreteOp.CountParam) for i := 0; i < concreteOp.Count; i++ { copy := concreteOp.CreatePodsOp ns := fmt.Sprintf("%s-%d", concreteOp.NamespacePrefix, i) @@ -773,35 +866,56 @@ func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op { return unrolled } -func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) []DataItem { - start := time.Now() - b.Cleanup(func() { - duration := time.Now().Sub(start) - // This includes startup and shutdown time and thus does not - // reflect scheduling performance. It's useful to get a feeling - // for how long each workload runs overall. - b.ReportMetric(duration.Seconds(), "runtime_seconds") - }) - +func setupClusterForWorkload(ctx context.Context, tb testing.TB, configPath string, featureGates map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { var cfg *config.KubeSchedulerConfiguration var err error - if tc.SchedulerConfigPath != nil { - cfg, err = loadSchedulerConfig(*tc.SchedulerConfigPath) + if configPath != "" { + cfg, err = loadSchedulerConfig(configPath) if err != nil { - b.Fatalf("error loading scheduler config file: %v", err) + tb.Fatalf("error loading scheduler config file: %v", err) } if err = validation.ValidateKubeSchedulerConfiguration(cfg); err != nil { - b.Fatalf("validate scheduler config file failed: %v", err) + tb.Fatalf("validate scheduler config file failed: %v", err) } } - informerFactory, client, dynClient := mustSetupScheduler(ctx, b, cfg, tc.FeatureGates) + return mustSetupCluster(ctx, tb, cfg, featureGates) +} + +func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory, client clientset.Interface, dynClient dynamic.Interface, cleanup bool) []DataItem { + b, benchmarking := tb.(*testing.B) + if benchmarking { + start := time.Now() + b.Cleanup(func() { + duration := time.Since(start) + // This includes startup and shutdown time and thus does not + // reflect scheduling performance. It's useful to get a feeling + // for how long each workload runs overall. + b.ReportMetric(duration.Seconds(), "runtime_seconds") + }) + } + + // Disable error checking of the sampling interval length in the + // throughput collector by default. When running benchmarks, report + // it as test failure when samples are not taken regularly. + var throughputErrorMargin float64 + if benchmarking { + // TODO: To prevent the perf-test failure, we increased the error margin, if still not enough + // one day, we should think of another approach to avoid this trick. + throughputErrorMargin = 30 + } // Additional informers needed for testing. The pod informer was // already created before (scheduler.NewInformerFactory) and the - // factory was started for it (mustSetupScheduler), therefore we don't + // factory was started for it (mustSetupCluster), therefore we don't // need to start again. podInformer := informerFactory.Core().V1().Pods() + // Everything else started by this function gets stopped before it returns. + ctx, cancel := context.WithCancel(ctx) + var wg sync.WaitGroup + defer wg.Wait() + defer cancel() + var mu sync.Mutex var dataItems []DataItem nextNodeIndex := 0 @@ -809,48 +923,47 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up. numPodsScheduledPerNamespace := make(map[string]int) - b.Cleanup(func() { - for namespace := range numPodsScheduledPerNamespace { - if err := client.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{}); err != nil { - b.Errorf("Deleting Namespace in numPodsScheduledPerNamespace: %v", err) - } - } - }) + if cleanup { + // This must run before controllers get shut down. + defer cleanupWorkload(ctx, tb, tc, client, numPodsScheduledPerNamespace) + } - for opIndex, op := range unrollWorkloadTemplate(b, tc.WorkloadTemplate, w) { + for opIndex, op := range unrollWorkloadTemplate(tb, tc.WorkloadTemplate, w) { realOp, err := op.realOp.patchParams(w) if err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } select { case <-ctx.Done(): - b.Fatalf("op %d: %v", opIndex, ctx.Err()) + tb.Fatalf("op %d: %v", opIndex, ctx.Err()) default: } switch concreteOp := realOp.(type) { case *createNodesOp: nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, client) if err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } if err := nodePreparer.PrepareNodes(ctx, nextNodeIndex); err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) + } + if cleanup { + defer func() { + if err := nodePreparer.CleanupNodes(ctx); err != nil { + tb.Fatalf("failed to clean up nodes, error: %v", err) + } + }() } - b.Cleanup(func() { - if err := nodePreparer.CleanupNodes(ctx); err != nil { - b.Fatalf("failed to clean up nodes, error: %v", err) - } - }) nextNodeIndex += concreteOp.Count case *createNamespacesOp: - nsPreparer, err := newNamespacePreparer(concreteOp, client, b) + nsPreparer, err := newNamespacePreparer(concreteOp, client, tb) if err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } if err := nsPreparer.prepare(ctx); err != nil { nsPreparer.cleanup(ctx) - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } for _, n := range nsPreparer.namespaces() { if _, ok := numPodsScheduledPerNamespace[n]; ok { @@ -867,18 +980,23 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ if concreteOp.Namespace != nil { namespace = *concreteOp.Namespace } - createNamespaceIfNotPresent(ctx, b, client, namespace, &numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace) if concreteOp.PodTemplatePath == nil { concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath } var collectors []testDataCollector + // This needs a separate context and wait group because + // the code below needs to be sure that the goroutines + // are stopped. var collectorCtx context.Context var collectorCancel func() var collectorWG sync.WaitGroup + defer collectorWG.Wait() + if concreteOp.CollectMetrics { collectorCtx, collectorCancel = context.WithCancel(ctx) defer collectorCancel() - collectors = getTestDataCollectors(b, podInformer, fmt.Sprintf("%s/%s", b.Name(), namespace), namespace, tc.MetricsCollectorConfig) + collectors = getTestDataCollectors(tb, podInformer, fmt.Sprintf("%s/%s", tb.Name(), namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin) for _, collector := range collectors { // Need loop-local variable for function below. collector := collector @@ -889,8 +1007,8 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ }() } } - if err := createPods(ctx, b, namespace, concreteOp, client); err != nil { - b.Fatalf("op %d: %v", opIndex, err) + if err := createPods(ctx, tb, namespace, concreteOp, client); err != nil { + tb.Fatalf("op %d: %v", opIndex, err) } if concreteOp.SkipWaitToCompletion { // Only record those namespaces that may potentially require barriers @@ -901,8 +1019,8 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ numPodsScheduledPerNamespace[namespace] = concreteOp.Count } } else { - if err := waitUntilPodsScheduledInNamespace(ctx, b, podInformer, namespace, concreteOp.Count); err != nil { - b.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) + if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, concreteOp.Count); err != nil { + tb.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) } } if concreteOp.CollectMetrics { @@ -936,7 +1054,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ // Ensure the namespace exists. nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} if _, err := client.CoreV1().Namespaces().Create(ctx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { - b.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) + tb.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) } var churnFns []func(name string) string @@ -944,12 +1062,12 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ for i, path := range concreteOp.TemplatePaths { unstructuredObj, gvk, err := getUnstructuredFromFile(path) if err != nil { - b.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) + tb.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) } // Obtain GVR. mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { - b.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) + tb.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) } gvr := mapping.Resource // Distinguish cluster-scoped with namespaced API objects. @@ -983,7 +1101,9 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ switch concreteOp.Mode { case Create: + wg.Add(1) go func() { + defer wg.Done() count, threshold := 0, concreteOp.Number if threshold == 0 { threshold = math.MaxInt32 @@ -1001,7 +1121,9 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ } }() case Recreate: + wg.Add(1) go func() { + defer wg.Done() retVals := make([][]string, len(churnFns)) // For each churn function, instantiate a slice of strings with length "concreteOp.Number". for i := range retVals { @@ -1026,11 +1148,11 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ case *barrierOp: for _, namespace := range concreteOp.Namespaces { if _, ok := numPodsScheduledPerNamespace[namespace]; !ok { - b.Fatalf("op %d: unknown namespace %s", opIndex, namespace) + tb.Fatalf("op %d: unknown namespace %s", opIndex, namespace) } } - if err := waitUntilPodsScheduled(ctx, b, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { - b.Fatalf("op %d: %v", opIndex, err) + if err := waitUntilPodsScheduled(ctx, tb, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { + tb.Fatalf("op %d: %v", opIndex, err) } // At the end of the barrier, we can be sure that there are no pods // pending scheduling in the namespaces that we just blocked on. @@ -1050,19 +1172,19 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ default: runable, ok := concreteOp.(runnableOp) if !ok { - b.Fatalf("op %d: invalid op %v", opIndex, concreteOp) + tb.Fatalf("op %d: invalid op %v", opIndex, concreteOp) } for _, namespace := range runable.requiredNamespaces() { - createNamespaceIfNotPresent(ctx, b, client, namespace, &numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace) } - runable.run(ctx, b, client) + runable.run(ctx, tb, client) } } // check unused params and inform users unusedParams := w.unusedParams() if len(unusedParams) != 0 { - b.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name) + tb.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name) } // Some tests have unschedulable pods. Do not add an implicit barrier at the @@ -1070,13 +1192,58 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ return dataItems } -func createNamespaceIfNotPresent(ctx context.Context, b *testing.B, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) { +// cleanupWorkload ensures that everything is removed from the API server that +// might have been created by runWorkload. This must be done before starting +// the next workload because otherwise it might stumble over previously created +// objects. For example, the namespaces are the same in different workloads, so +// not deleting them would cause the next one to fail with "cannot create +// namespace: already exists". +// +// Calling cleanupWorkload can be skipped if it is known that the next workload +// will run with a fresh etcd instance. +func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client clientset.Interface, numPodsScheduledPerNamespace map[string]int) { + deleteNow := *metav1.NewDeleteOptions(0) + for namespace := range numPodsScheduledPerNamespace { + // Pods have to be deleted explicitly, with no grace period. Normally + // kubelet will set the DeletionGracePeriodSeconds to zero when it's okay + // to remove a deleted pod, but we don't run kubelet... + if err := client.CoreV1().Pods(namespace).DeleteCollection(ctx, deleteNow, metav1.ListOptions{}); err != nil { + tb.Fatalf("failed to delete pods in namespace %q: %v", namespace, err) + } + if err := client.CoreV1().Namespaces().Delete(ctx, namespace, deleteNow); err != nil { + tb.Fatalf("Deleting Namespace %q in numPodsScheduledPerNamespace: %v", namespace, err) + } + } + + // We need to wait here because even with deletion timestamp set, + // actually removing a namespace can take some time (garbage collecting + // other generated object like secrets, etc.) and we don't want to + // start the next workloads while that cleanup is still going on. + if err := wait.PollUntilContextTimeout(ctx, time.Second, 5*time.Minute, false, func(ctx context.Context) (bool, error) { + namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + if err != nil { + return false, err + } + for _, namespace := range namespaces.Items { + if _, ok := numPodsScheduledPerNamespace[namespace.Name]; ok { + // A namespace created by the workload, need to wait. + return false, nil + } + } + // All namespaces gone. + return true, nil + }); err != nil { + tb.Fatalf("failed while waiting for namespace removal: %v", err) + } +} + +func createNamespaceIfNotPresent(ctx context.Context, tb testing.TB, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) { if _, ok := (*podsPerNamespace)[namespace]; !ok { // The namespace has not created yet. // So, create that and register it. _, err := client.CoreV1().Namespaces().Create(ctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{}) if err != nil { - b.Fatalf("failed to create namespace for Pod: %v", namespace) + tb.Fatalf("failed to create namespace for Pod: %v", namespace) } (*podsPerNamespace)[namespace] = 0 } @@ -1087,12 +1254,12 @@ type testDataCollector interface { collect() []DataItem } -func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig) []testDataCollector { +func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector { if mcc == nil { mcc = &defaultMetricsCollectorConfig } return []testDataCollector{ - newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}), + newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin), newMetricsCollector(mcc, map[string]string{"Name": name}), } } @@ -1125,12 +1292,12 @@ func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Inte ), nil } -func createPods(ctx context.Context, b *testing.B, namespace string, cpo *createPodsOp, clientset clientset.Interface) error { +func createPods(ctx context.Context, tb testing.TB, namespace string, cpo *createPodsOp, clientset clientset.Interface) error { strategy, err := getPodStrategy(cpo) if err != nil { return err } - b.Logf("creating %d pods in namespace %q", cpo.Count, namespace) + tb.Logf("creating %d pods in namespace %q", cpo.Count, namespace) config := testutils.NewTestPodCreatorConfig() config.AddStrategy(namespace, cpo.Count, strategy) podCreator := testutils.NewTestPodCreator(clientset, config) @@ -1140,7 +1307,7 @@ func createPods(ctx context.Context, b *testing.B, namespace string, cpo *create // waitUntilPodsScheduledInNamespace blocks until all pods in the given // namespace are scheduled. Times out after 10 minutes because even at the // lowest observed QPS of ~10 pods/sec, a 5000-node test should complete. -func waitUntilPodsScheduledInNamespace(ctx context.Context, b *testing.B, podInformer coreinformers.PodInformer, namespace string, wantCount int) error { +func waitUntilPodsScheduledInNamespace(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespace string, wantCount int) error { return wait.PollImmediate(1*time.Second, 10*time.Minute, func() (bool, error) { select { case <-ctx.Done(): @@ -1152,17 +1319,17 @@ func waitUntilPodsScheduledInNamespace(ctx context.Context, b *testing.B, podInf return false, err } if len(scheduled) >= wantCount { - b.Logf("scheduling succeed") + tb.Logf("scheduling succeed") return true, nil } - b.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled)) + tb.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled)) return false, nil }) } // waitUntilPodsScheduled blocks until the all pods in the given namespaces are // scheduled. -func waitUntilPodsScheduled(ctx context.Context, b *testing.B, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { +func waitUntilPodsScheduled(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { // If unspecified, default to all known namespaces. if len(namespaces) == 0 { for namespace := range numPodsScheduledPerNamespace { @@ -1179,7 +1346,7 @@ func waitUntilPodsScheduled(ctx context.Context, b *testing.B, podInformer corei if !ok { return fmt.Errorf("unknown namespace %s", namespace) } - if err := waitUntilPodsScheduledInNamespace(ctx, b, podInformer, namespace, wantCount); err != nil { + if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, wantCount); err != nil { return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err) } } @@ -1333,10 +1500,10 @@ type namespacePreparer struct { count int prefix string spec *v1.Namespace - t testing.TB + tb testing.TB } -func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface, b *testing.B) (*namespacePreparer, error) { +func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface, tb testing.TB) (*namespacePreparer, error) { ns := &v1.Namespace{} if cno.NamespaceTemplatePath != nil { if err := getSpecFromFile(cno.NamespaceTemplatePath, ns); err != nil { @@ -1349,7 +1516,7 @@ func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface count: cno.Count, prefix: cno.Prefix, spec: ns, - t: b, + tb: tb, }, nil } @@ -1368,7 +1535,7 @@ func (p *namespacePreparer) prepare(ctx context.Context) error { if p.spec != nil { base = p.spec } - p.t.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base) + p.tb.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base) for i := 0; i < p.count; i++ { n := base.DeepCopy() n.Name = fmt.Sprintf("%s-%d", p.prefix, i) @@ -1388,7 +1555,7 @@ func (p *namespacePreparer) cleanup(ctx context.Context) error { for i := 0; i < p.count; i++ { n := fmt.Sprintf("%s-%d", p.prefix, i) if err := p.client.CoreV1().Namespaces().Delete(ctx, n, metav1.DeleteOptions{}); err != nil { - p.t.Errorf("Deleting Namespace: %v", err) + p.tb.Errorf("Deleting Namespace: %v", err) errRet = err } } diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index e9fb5273be8..79bc3ab10bb 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -73,18 +73,20 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { return &cfg, nil } -// mustSetupScheduler starts the following components: +// mustSetupCluster starts the following components: // - k8s api server // - scheduler +// - some of the kube-controller-manager controllers +// // It returns regular and dynamic clients, and destroyFunc which should be used to // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { +func mustSetupCluster(ctx context.Context, tb testing.TB, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { // Run API server with minimimal logging by default. Can be raised with -v. framework.MinVerbosity = 0 - _, kubeConfig, tearDownFn := framework.StartTestServer(ctx, b, framework.TestServerSetup{ + _, kubeConfig, tearDownFn := framework.StartTestServer(ctx, tb, framework.TestServerSetup{ ModifyServerRunOptions: func(opts *options.ServerRunOptions) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"} @@ -97,12 +99,12 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc } }, }) - b.Cleanup(tearDownFn) + tb.Cleanup(tearDownFn) // Cleanup will be in reverse order: first the clients get cancelled, // then the apiserver is torn down. ctx, cancel := context.WithCancel(ctx) - b.Cleanup(cancel) + tb.Cleanup(cancel) // TODO: client connection configuration, such as QPS or Burst is configurable in theory, this could be derived from the `config`, need to // support this when there is any testcase that depends on such configuration. @@ -115,7 +117,7 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc var err error config, err = newDefaultComponentConfig() if err != nil { - b.Fatalf("Error creating default component config: %v", err) + tb.Fatalf("Error creating default component config: %v", err) } } @@ -126,16 +128,20 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc // be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`. _, informerFactory := util.StartScheduler(ctx, client, cfg, config) util.StartFakePVController(ctx, client, informerFactory) + runGC := util.CreateGCController(ctx, tb, *cfg, informerFactory) + runNS := util.CreateNamespaceController(ctx, tb, *cfg, informerFactory) runResourceClaimController := func() {} if enabledFeatures[features.DynamicResourceAllocation] { // Testing of DRA with inline resource claims depends on this // controller for creating and removing ResourceClaims. - runResourceClaimController = util.CreateResourceClaimController(ctx, b, client, informerFactory) + runResourceClaimController = util.CreateResourceClaimController(ctx, tb, client, informerFactory) } informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) + go runGC() + go runNS() go runResourceClaimController() return informerFactory, client, dynClient @@ -314,14 +320,16 @@ type throughputCollector struct { schedulingThroughputs []float64 labels map[string]string namespaces []string + errorMargin float64 } -func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string) *throughputCollector { +func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector { return &throughputCollector{ tb: tb, podInformer: podInformer, labels: labels, namespaces: namespaces, + errorMargin: errorMargin, } } @@ -382,9 +390,7 @@ func (tc *throughputCollector) run(ctx context.Context) { throughput := float64(newScheduled) / durationInSeconds expectedDuration := throughputSampleInterval * time.Duration(skipped+1) errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100 - // TODO: To prevent the perf-test failure, we increased the error margin, if still not enough - // one day, we should think of another approach to avoid this trick. - if math.Abs(errorMargin) > 30 { + if tc.errorMargin > 0 && math.Abs(errorMargin) > tc.errorMargin { // This might affect the result, report it. tc.tb.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin) } diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 1f154ed1802..4026cbf8342 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -38,17 +38,22 @@ import ( "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatainformer" restclient "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/scale" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" pvutil "k8s.io/component-helpers/storage/volume" + "k8s.io/controller-manager/pkg/informerfactory" "k8s.io/klog/v2" "k8s.io/kube-scheduler/config/v1beta3" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/disruption" + "k8s.io/kubernetes/pkg/controller/garbagecollector" + "k8s.io/kubernetes/pkg/controller/namespace" "k8s.io/kubernetes/pkg/controller/resourceclaim" "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/scheduler" @@ -137,7 +142,13 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface, i claimRef := obj.Spec.ClaimRef pvc, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Get(ctx, claimRef.Name, metav1.GetOptions{}) if err != nil { - klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err) + // Note that the error can be anything, because components like + // apiserver are also shutting down at the same time, but this + // check is conservative and only ignores the "context canceled" + // error while shutting down. + if ctx.Err() == nil || !errors.Is(err, context.Canceled) { + klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err) + } return } @@ -146,7 +157,10 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface, i metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "yes") _, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Update(ctx, pvc, metav1.UpdateOptions{}) if err != nil { - klog.Errorf("error while updating %v/%v: %v", claimRef.Namespace, claimRef.Name, err) + if ctx.Err() == nil || !errors.Is(err, context.Canceled) { + // Shutting down, no need to record this. + klog.Errorf("error while updating %v/%v: %v", claimRef.Namespace, claimRef.Name, err) + } return } } @@ -163,6 +177,67 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface, i }) } +// CreateGCController creates a garbage controller and returns a run function +// for it. The informer factory needs to be started before invoking that +// function. +func CreateGCController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() { + restclient.AddUserAgent(&restConfig, "gc-controller") + clientSet := clientset.NewForConfigOrDie(&restConfig) + metadataClient, err := metadata.NewForConfig(&restConfig) + if err != nil { + tb.Fatalf("Failed to create metadataClient: %v", err) + } + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery())) + restMapper.Reset() + metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0) + alwaysStarted := make(chan struct{}) + close(alwaysStarted) + gc, err := garbagecollector.NewGarbageCollector( + clientSet, + metadataClient, + restMapper, + garbagecollector.DefaultIgnoredResources(), + informerfactory.NewInformerFactory(informerSet, metadataInformers), + alwaysStarted, + ) + if err != nil { + tb.Fatalf("Failed creating garbage collector") + } + startGC := func() { + syncPeriod := 5 * time.Second + go wait.Until(func() { + restMapper.Reset() + }, syncPeriod, ctx.Done()) + go gc.Run(ctx, 1) + go gc.Sync(ctx, clientSet.Discovery(), syncPeriod) + } + return startGC +} + +// CreateNamespaceController creates a namespace controller and returns a run +// function for it. The informer factory needs to be started before invoking +// that function. +func CreateNamespaceController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() { + restclient.AddUserAgent(&restConfig, "namespace-controller") + clientSet := clientset.NewForConfigOrDie(&restConfig) + metadataClient, err := metadata.NewForConfig(&restConfig) + if err != nil { + tb.Fatalf("Failed to create metadataClient: %v", err) + } + discoverResourcesFn := clientSet.Discovery().ServerPreferredNamespacedResources + controller := namespace.NewNamespaceController( + ctx, + clientSet, + metadataClient, + discoverResourcesFn, + informerSet.Core().V1().Namespaces(), + 10*time.Hour, + v1.FinalizerKubernetes) + return func() { + go controller.Run(ctx, 5) + } +} + // TestContext store necessary context info type TestContext struct { NS *v1.Namespace