From c91c578795c119aecba9cc9b851ae88ca13d2526 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 22 Mar 2023 13:29:33 +0100 Subject: [PATCH 1/6] scheduler_perf: skip expensive cleanup during benchmarks Each benchmark test case runs with a fresh etcd instance. Therefore it is not necessary to delete objects after a run. A future unit test might reuse etcd, therefore cleanup is optional. --- .../scheduler_perf/scheduler_perf_test.go | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index e6543c66bbd..a39e76eb6b4 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -696,7 +696,7 @@ func BenchmarkPerfScheduling(b *testing.B) { for feature, flag := range tc.FeatureGates { defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)() } - results := runWorkload(ctx, b, tc, w) + results := runWorkload(ctx, b, tc, w, false) dataItems.DataItems = append(dataItems.DataItems, results...) if len(results) > 0 { @@ -773,7 +773,7 @@ func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op { return unrolled } -func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) []DataItem { +func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, cleanup bool) []DataItem { start := time.Now() b.Cleanup(func() { duration := time.Now().Sub(start) @@ -809,13 +809,15 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ // All namespaces listed in numPodsScheduledPerNamespace will be cleaned up. numPodsScheduledPerNamespace := make(map[string]int) - b.Cleanup(func() { - for namespace := range numPodsScheduledPerNamespace { - if err := client.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{}); err != nil { - b.Errorf("Deleting Namespace in numPodsScheduledPerNamespace: %v", err) + if cleanup { + b.Cleanup(func() { + for namespace := range numPodsScheduledPerNamespace { + if err := client.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{}); err != nil { + b.Errorf("Deleting Namespace in numPodsScheduledPerNamespace: %v", err) + } } - } - }) + }) + } for opIndex, op := range unrollWorkloadTemplate(b, tc.WorkloadTemplate, w) { realOp, err := op.realOp.patchParams(w) @@ -836,11 +838,13 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [ if err := nodePreparer.PrepareNodes(ctx, nextNodeIndex); err != nil { b.Fatalf("op %d: %v", opIndex, err) } - b.Cleanup(func() { - if err := nodePreparer.CleanupNodes(ctx); err != nil { - b.Fatalf("failed to clean up nodes, error: %v", err) - } - }) + if cleanup { + b.Cleanup(func() { + if err := nodePreparer.CleanupNodes(ctx); err != nil { + b.Fatalf("failed to clean up nodes, error: %v", err) + } + }) + } nextNodeIndex += concreteOp.Count case *createNamespacesOp: From 2e7f37353cb21d9cd8572b029d138fdce7846d9b Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 22 Mar 2023 13:31:48 +0100 Subject: [PATCH 2/6] test/integration: avoid errors in fake PC controller during shutdown Once the context is canceled, the controller can stop processing events. Without this change it prints errors when the apiserver is already down. --- test/integration/util/util.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 359459e6846..4a0451d326d 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -127,7 +127,13 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface, i claimRef := obj.Spec.ClaimRef pvc, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Get(ctx, claimRef.Name, metav1.GetOptions{}) if err != nil { - klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err) + // Note that the error can be anything, because components like + // apiserver are also shutting down at the same time, but this + // check is conservative and only ignores the "context canceled" + // error while shutting down. + if ctx.Err() == nil || !errors.Is(err, context.Canceled) { + klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err) + } return } @@ -136,7 +142,10 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface, i metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "yes") _, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Update(ctx, pvc, metav1.UpdateOptions{}) if err != nil { - klog.Errorf("error while updating %v/%v: %v", claimRef.Namespace, claimRef.Name, err) + if ctx.Err() == nil || !errors.Is(err, context.Canceled) { + // Shutting down, no need to record this. + klog.Errorf("error while updating %v/%v: %v", claimRef.Namespace, claimRef.Name, err) + } return } } From d9c16a1cedd28d5492792afcc63af8b9f49b52b8 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 23 Mar 2023 16:50:22 +0100 Subject: [PATCH 3/6] scheduler_perf: fix goroutine leak in runWorkload This becomes relevant when doing more fine-grained leak checking. --- .../scheduler_perf/scheduler_perf_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index a39e76eb6b4..ba051aa8c9c 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -802,6 +802,12 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c // need to start again. podInformer := informerFactory.Core().V1().Pods() + // Everything else started by this function gets stopped before it returns. + ctx, cancel := context.WithCancel(ctx) + var wg sync.WaitGroup + defer wg.Wait() + defer cancel() + var mu sync.Mutex var dataItems []DataItem nextNodeIndex := 0 @@ -876,9 +882,14 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath } var collectors []testDataCollector + // This needs a separate context and wait group because + // the code below needs to be sure that the goroutines + // are stopped. var collectorCtx context.Context var collectorCancel func() var collectorWG sync.WaitGroup + defer collectorWG.Wait() + if concreteOp.CollectMetrics { collectorCtx, collectorCancel = context.WithCancel(ctx) defer collectorCancel() @@ -987,7 +998,9 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c switch concreteOp.Mode { case Create: + wg.Add(1) go func() { + defer wg.Done() count, threshold := 0, concreteOp.Number if threshold == 0 { threshold = math.MaxInt32 @@ -1005,7 +1018,9 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c } }() case Recreate: + wg.Add(1) go func() { + defer wg.Done() retVals := make([][]string, len(churnFns)) // For each churn function, instantiate a slice of strings with length "concreteOp.Number". for i := range retVals { From dfd646e0a89d21d303422a51aeba0364b0551da9 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 23 Mar 2023 16:59:30 +0100 Subject: [PATCH 4/6] scheduler_perf: fix namespace deletion Merely deleting the namespace is not enough: - Workloads might rely on the garbage collector to get rid of obsolete objects, so we should run it to be on the safe side. - Pods must be force-deleted because kubelet is not running. - Finally, the namespace controller is needed to get rid of deleted namespaces. --- test/integration/job/job_test.go | 43 +--------- .../scheduler_perf/scheduler_perf_test.go | 80 ++++++++++++++++--- test/integration/scheduler_perf/util.go | 8 +- test/integration/util/util.go | 66 +++++++++++++++ 4 files changed, 146 insertions(+), 51 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index f1e8435e09b..215c369a082 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -41,27 +41,22 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/util/feature" - cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1" - "k8s.io/client-go/metadata" - "k8s.io/client-go/metadata/metadatainformer" restclient "k8s.io/client-go/rest" - "k8s.io/client-go/restmapper" "k8s.io/client-go/util/retry" featuregatetesting "k8s.io/component-base/featuregate/testing" basemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/testutil" - "k8s.io/controller-manager/pkg/informerfactory" "k8s.io/klog/v2" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/controller/garbagecollector" jobcontroller "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/kubernetes/test/integration/util" "k8s.io/utils/pointer" ) @@ -1313,7 +1308,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { defer cancel() restConfig.QPS = 200 restConfig.Burst = 200 - runGC := createGC(ctx, t, restConfig, informerSet) + runGC := util.CreateGCController(ctx, t, *restConfig, informerSet) informerSet.Start(ctx.Done()) go jc.Run(ctx, 1) runGC() @@ -2092,40 +2087,6 @@ func createJobControllerWithSharedInformers(restConfig *restclient.Config, infor return jc, ctx, cancel } -func createGC(ctx context.Context, t *testing.T, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) func() { - restConfig = restclient.AddUserAgent(restConfig, "gc-controller") - clientSet := clientset.NewForConfigOrDie(restConfig) - metadataClient, err := metadata.NewForConfig(restConfig) - if err != nil { - t.Fatalf("Failed to create metadataClient: %v", err) - } - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery())) - restMapper.Reset() - metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0) - alwaysStarted := make(chan struct{}) - close(alwaysStarted) - gc, err := garbagecollector.NewGarbageCollector( - clientSet, - metadataClient, - restMapper, - garbagecollector.DefaultIgnoredResources(), - informerfactory.NewInformerFactory(informerSet, metadataInformers), - alwaysStarted, - ) - if err != nil { - t.Fatalf("Failed creating garbage collector") - } - startGC := func() { - syncPeriod := 5 * time.Second - go wait.Until(func() { - restMapper.Reset() - }, syncPeriod, ctx.Done()) - go gc.Run(ctx, 1) - go gc.Sync(ctx, clientSet.Discovery(), syncPeriod) - } - return startGC -} - func hasJobTrackingFinalizer(obj metav1.Object) bool { for _, fin := range obj.GetFinalizers() { if fin == batchv1.JobTrackingFinalizer { diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index ba051aa8c9c..bd02232eeee 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -30,6 +30,8 @@ import ( "testing" "time" + "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -47,6 +49,7 @@ import ( "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" @@ -794,11 +797,11 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c b.Fatalf("validate scheduler config file failed: %v", err) } } - informerFactory, client, dynClient := mustSetupScheduler(ctx, b, cfg, tc.FeatureGates) + informerFactory, client, dynClient := mustSetupCluster(ctx, b, cfg, tc.FeatureGates) // Additional informers needed for testing. The pod informer was // already created before (scheduler.NewInformerFactory) and the - // factory was started for it (mustSetupScheduler), therefore we don't + // factory was started for it (mustSetupCluster), therefore we don't // need to start again. podInformer := informerFactory.Core().V1().Pods() @@ -816,13 +819,8 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c numPodsScheduledPerNamespace := make(map[string]int) if cleanup { - b.Cleanup(func() { - for namespace := range numPodsScheduledPerNamespace { - if err := client.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{}); err != nil { - b.Errorf("Deleting Namespace in numPodsScheduledPerNamespace: %v", err) - } - } - }) + // This must run before controllers get shut down. + defer cleanupWorkload(ctx, b, tc, client, numPodsScheduledPerNamespace) } for opIndex, op := range unrollWorkloadTemplate(b, tc.WorkloadTemplate, w) { @@ -1089,6 +1087,70 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c return dataItems } +// cleanupWorkload ensures that everything is removed from the API server that +// might have been created by runWorkload. This must be done before starting +// the next workload because otherwise it might stumble over previously created +// objects. For example, the namespaces are the same in different workloads, so +// not deleting them would cause the next one to fail with "cannot create +// namespace: already exists". +// +// Calling cleanupWorkload can be skipped if it is known that the next workload +// will run with a fresh etcd instance. +func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client clientset.Interface, numPodsScheduledPerNamespace map[string]int) { + deleteNow := *metav1.NewDeleteOptions(0) + for namespace := range numPodsScheduledPerNamespace { + // Pods have to be deleted explicitly, with no grace period. Normally + // kubelet will set the DeletionGracePeriodSeconds to zero when it's okay + // to remove a deleted pod, but we don't run kubelet... + if err := client.CoreV1().Pods(namespace).DeleteCollection(ctx, deleteNow, metav1.ListOptions{}); err != nil { + tb.Fatalf("failed to delete pods in namespace %q: %v", namespace, err) + } + if err := client.CoreV1().Namespaces().Delete(ctx, namespace, deleteNow); err != nil { + tb.Fatalf("Deleting Namespace %q in numPodsScheduledPerNamespace: %v", namespace, err) + } + } + + // We need to wait here because even with deletion timestamp set, + // actually removing a namespace can take some time (garbage collecting + // other generated object like secrets, etc.) and we don't want to + // start the next workloads while that cleanup is still going on. + gomega.NewGomegaWithT(tb).Eventually(ctx, func(ctx context.Context) ([]interface{}, error) { + var objects []interface{} + namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + // Collecting several objects of interest (pods, claims) is done to + // provide a more informative failure message when a namespace doesn't + // disappear quickly enough. + for _, namespace := range namespaces.Items { + if _, ok := numPodsScheduledPerNamespace[namespace.Name]; !ok { + // Not a namespace created by the workload. + continue + } + pods, err := client.CoreV1().Pods(namespace.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + if len(pods.Items) > 0 { + // Record one pod per namespace - that's usually enough information. + objects = append(objects, pods.Items[0]) + } + if tc.FeatureGates[features.DynamicResourceAllocation] { + claims, err := client.ResourceV1alpha2().ResourceClaims(namespace.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + if len(claims.Items) > 0 { + objects = append(objects, claims.Items[0]) + } + } + objects = append(objects, namespace) + } + return objects, nil + }).WithTimeout(5*time.Minute).Should(gomega.BeEmpty(), "deleting namespaces") +} + func createNamespaceIfNotPresent(ctx context.Context, b *testing.B, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) { if _, ok := (*podsPerNamespace)[namespace]; !ok { // The namespace has not created yet. diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index e9fb5273be8..91d6edc78ad 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -76,11 +76,13 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { // mustSetupScheduler starts the following components: // - k8s api server // - scheduler +// - some of the kube-controller-manager controllers +// // It returns regular and dynamic clients, and destroyFunc which should be used to // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { +func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { // Run API server with minimimal logging by default. Can be raised with -v. framework.MinVerbosity = 0 @@ -126,6 +128,8 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc // be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`. _, informerFactory := util.StartScheduler(ctx, client, cfg, config) util.StartFakePVController(ctx, client, informerFactory) + runGC := util.CreateGCController(ctx, b, *cfg, informerFactory) + runNS := util.CreateNamespaceController(ctx, b, *cfg, informerFactory) runResourceClaimController := func() {} if enabledFeatures[features.DynamicResourceAllocation] { @@ -136,6 +140,8 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) + go runGC() + go runNS() go runResourceClaimController() return informerFactory, client, dynClient diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 4a0451d326d..0875350d3c5 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -36,17 +36,22 @@ import ( "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatainformer" restclient "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/scale" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" pvutil "k8s.io/component-helpers/storage/volume" + "k8s.io/controller-manager/pkg/informerfactory" "k8s.io/klog/v2" "k8s.io/kube-scheduler/config/v1beta3" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/disruption" + "k8s.io/kubernetes/pkg/controller/garbagecollector" + "k8s.io/kubernetes/pkg/controller/namespace" "k8s.io/kubernetes/pkg/controller/resourceclaim" "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/scheduler" @@ -162,6 +167,67 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface, i }) } +// CreateGCController creates a garbage controller and returns a run function +// for it. The informer factory needs to be started before invoking that +// function. +func CreateGCController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() { + restclient.AddUserAgent(&restConfig, "gc-controller") + clientSet := clientset.NewForConfigOrDie(&restConfig) + metadataClient, err := metadata.NewForConfig(&restConfig) + if err != nil { + tb.Fatalf("Failed to create metadataClient: %v", err) + } + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery())) + restMapper.Reset() + metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0) + alwaysStarted := make(chan struct{}) + close(alwaysStarted) + gc, err := garbagecollector.NewGarbageCollector( + clientSet, + metadataClient, + restMapper, + garbagecollector.DefaultIgnoredResources(), + informerfactory.NewInformerFactory(informerSet, metadataInformers), + alwaysStarted, + ) + if err != nil { + tb.Fatalf("Failed creating garbage collector") + } + startGC := func() { + syncPeriod := 5 * time.Second + go wait.Until(func() { + restMapper.Reset() + }, syncPeriod, ctx.Done()) + go gc.Run(ctx, 1) + go gc.Sync(ctx, clientSet.Discovery(), syncPeriod) + } + return startGC +} + +// CreateNamespaceController creates a namespace controller and returns a run +// function for it. The informer factory needs to be started before invoking +// that function. +func CreateNamespaceController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() { + restclient.AddUserAgent(&restConfig, "namespace-controller") + clientSet := clientset.NewForConfigOrDie(&restConfig) + metadataClient, err := metadata.NewForConfig(&restConfig) + if err != nil { + tb.Fatalf("Failed to create metadataClient: %v", err) + } + discoverResourcesFn := clientSet.Discovery().ServerPreferredNamespacedResources + controller := namespace.NewNamespaceController( + ctx, + clientSet, + metadataClient, + discoverResourcesFn, + informerSet.Core().V1().Namespaces(), + 10*time.Hour, + v1.FinalizerKubernetes) + return func() { + go controller.Run(ctx, 5) + } +} + // TestContext store necessary context info type TestContext struct { NS *v1.Namespace From cecebe8ea2feee856bc7a62f4c16711ee8a5f5d9 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 23 Mar 2023 17:01:42 +0100 Subject: [PATCH 5/6] scheduler_perf: add TestScheduling integration test This runs workloads that are labeled as "integration-test". The apiserver and scheduler are only started once per unique configuration, followed by each workload using that configuration. This makes execution faster. In contrast to benchmarking, we care less about starting with a clean slate for each test. --- test/integration/scheduler_perf/README.md | 12 + .../config/performance-config.yaml | 31 ++- .../scheduler_perf/scheduler_perf_test.go | 237 +++++++++++++----- test/integration/scheduler_perf/util.go | 26 +- 4 files changed, 220 insertions(+), 86 deletions(-) diff --git a/test/integration/scheduler_perf/README.md b/test/integration/scheduler_perf/README.md index 91cf677e4ad..618e48a3be4 100644 --- a/test/integration/scheduler_perf/README.md +++ b/test/integration/scheduler_perf/README.md @@ -100,3 +100,15 @@ performance. During interactive debugging sessions it is possible to enable per-test output via -use-testing-log. + +## Integration tests + +To run integration tests, use: +``` +make test-integration WHAT=./test/integration/scheduler_perf KUBE_TEST_ARGS=-use-testing-log +``` + +Integration testing uses the same `config/performance-config.yaml` as +benchmarking. By default, workloads labeled as `integration-test` are executed +as part of integration testing. `-test-scheduling-label-filter` can be used to +change that. diff --git a/test/integration/scheduler_perf/config/performance-config.yaml b/test/integration/scheduler_perf/config/performance-config.yaml index 1a5e82f6826..6d2d0e4ac2e 100644 --- a/test/integration/scheduler_perf/config/performance-config.yaml +++ b/test/integration/scheduler_perf/config/performance-config.yaml @@ -1,3 +1,17 @@ +# The following labels are used in this file: +# - fast: short execution time, ideally less than 30 seconds +# - integration-test: used to select workloads that +# run in pull-kubernetes-integration. Choosing those tests +# is a tradeoff between code coverage and overall runtime. +# - performance: used to select workloads that run +# in ci-benchmark-scheduler-perf. Such workloads +# must run long enough (ideally, longer than 10 seconds) +# to provide meaningful samples for the pod scheduling +# rate. +# +# Combining "performance" and "fast" selects suitable workloads for a local +# before/after comparisons with benchstat. + - name: SchedulingBasic defaultPodTemplatePath: config/pod-default.yaml workloadTemplate: @@ -10,7 +24,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 500 @@ -39,7 +53,7 @@ namespace: sched-1 workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 100 @@ -161,7 +175,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 500 @@ -223,7 +237,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 500 @@ -308,7 +322,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 1000 @@ -386,7 +400,7 @@ collectMetrics: true workloads: - name: 500Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 500 initPods: 200 @@ -504,7 +518,7 @@ collectMetrics: true workloads: - name: 1000Nodes - labels: [fast] + labels: [integration-test, fast] params: initNodes: 1000 measurePods: 1000 @@ -734,6 +748,7 @@ collectMetrics: true workloads: - name: fast + labels: [integration-test, fast] params: # This testcase runs through all code paths without # taking too long overall. @@ -743,7 +758,7 @@ measurePods: 10 maxClaimsPerNode: 10 - name: 2000pods_100nodes - labels: [performance,fast] + labels: [performance, fast] params: # In this testcase, the number of nodes is smaller # than the limit for the PodScheduling slices. diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index bd02232eeee..619d46bc39a 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -30,6 +30,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" @@ -43,6 +44,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" + "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" @@ -128,7 +130,7 @@ type testCase struct { Workloads []*workload // SchedulerConfigPath is the path of scheduler configuration // Optional - SchedulerConfigPath *string + SchedulerConfigPath string // Default path to spec file describing the pods to create. // This path can be overridden in createPodsOp by setting PodTemplatePath . // Optional @@ -640,7 +642,7 @@ func initTestOutput(tb testing.TB) io.Writer { return output } -var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-)") +var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by BenchmarkPerfScheduling") func BenchmarkPerfScheduling(b *testing.B) { testCases, err := getTestCases(configFile) @@ -699,7 +701,8 @@ func BenchmarkPerfScheduling(b *testing.B) { for feature, flag := range tc.FeatureGates { defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)() } - results := runWorkload(ctx, b, tc, w, false) + informerFactory, client, dyncClient := setupClusterForWorkload(ctx, b, tc.SchedulerConfigPath, tc.FeatureGates) + results := runWorkload(ctx, b, tc, w, informerFactory, client, dyncClient, false) dataItems.DataItems = append(dataItems.DataItems, results...) if len(results) > 0 { @@ -737,6 +740,95 @@ func BenchmarkPerfScheduling(b *testing.B) { } } +var testSchedulingLabelFilter = flag.String("test-scheduling-label-filter", "integration-test", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by TestScheduling") + +func TestScheduling(t *testing.T) { + testCases, err := getTestCases(configFile) + if err != nil { + t.Fatal(err) + } + if err = validateTestCases(testCases); err != nil { + t.Fatal(err) + } + + // Check for leaks at the very end. + framework.GoleakCheck(t) + + // All integration test cases share the same etcd, similar to + // https://github.com/kubernetes/kubernetes/blob/18d05b646d09b2971dc5400bc288062b0414e8cf/test/integration/framework/etcd.go#L186-L222. + framework.StartEtcd(t, nil) + + // Workloads with the same configuration share the same apiserver. For that + // we first need to determine what those different configs are. + var configs []schedulerConfig + for _, tc := range testCases { + tcEnabled := false + for _, w := range tc.Workloads { + if enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) { + tcEnabled = true + break + } + } + if !tcEnabled { + continue + } + exists := false + for _, config := range configs { + if config.equals(tc) { + exists = true + break + } + } + if !exists { + configs = append(configs, schedulerConfig{schedulerConfigPath: tc.SchedulerConfigPath, featureGates: tc.FeatureGates}) + } + } + for _, config := range configs { + // Not a sub test because we don't have a good name for it. + func() { + _, ctx := ktesting.NewTestContext(t) + // No timeout here because the `go test -timeout` will ensure that + // the test doesn't get stuck forever. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + for feature, flag := range config.featureGates { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, flag)() + } + informerFactory, client, dynClient := setupClusterForWorkload(ctx, t, config.schedulerConfigPath, config.featureGates) + + for _, tc := range testCases { + if !config.equals(tc) { + // Runs with some other config. + continue + } + + t.Run(tc.Name, func(t *testing.T) { + for _, w := range tc.Workloads { + t.Run(w.Name, func(t *testing.T) { + if !enabled(*testSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) { + t.Skipf("disabled by label filter %q", *testSchedulingLabelFilter) + } + _, ctx := ktesting.NewTestContext(t) + runWorkload(ctx, t, tc, w, informerFactory, client, dynClient, true) + }) + } + }) + } + }() + } +} + +type schedulerConfig struct { + schedulerConfigPath string + featureGates map[featuregate.Feature]bool +} + +func (c schedulerConfig) equals(tc *testCase) bool { + return c.schedulerConfigPath == tc.SchedulerConfigPath && + cmp.Equal(c.featureGates, tc.FeatureGates) +} + func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error) { data, err := os.ReadFile(file) if err != nil { @@ -753,16 +845,16 @@ func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error return nil, fmt.Errorf("couldn't decode as KubeSchedulerConfiguration, got %s: ", gvk) } -func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op { +func unrollWorkloadTemplate(tb testing.TB, wt []op, w *workload) []op { var unrolled []op for opIndex, o := range wt { realOp, err := o.realOp.patchParams(w) if err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } switch concreteOp := realOp.(type) { case *createPodSetsOp: - b.Logf("Creating %d pod sets %s", concreteOp.Count, concreteOp.CountParam) + tb.Logf("Creating %d pod sets %s", concreteOp.Count, concreteOp.CountParam) for i := 0; i < concreteOp.Count; i++ { copy := concreteOp.CreatePodsOp ns := fmt.Sprintf("%s-%d", concreteOp.NamespacePrefix, i) @@ -776,28 +868,43 @@ func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op { return unrolled } -func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, cleanup bool) []DataItem { - start := time.Now() - b.Cleanup(func() { - duration := time.Now().Sub(start) - // This includes startup and shutdown time and thus does not - // reflect scheduling performance. It's useful to get a feeling - // for how long each workload runs overall. - b.ReportMetric(duration.Seconds(), "runtime_seconds") - }) - +func setupClusterForWorkload(ctx context.Context, tb testing.TB, configPath string, featureGates map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { var cfg *config.KubeSchedulerConfiguration var err error - if tc.SchedulerConfigPath != nil { - cfg, err = loadSchedulerConfig(*tc.SchedulerConfigPath) + if configPath != "" { + cfg, err = loadSchedulerConfig(configPath) if err != nil { - b.Fatalf("error loading scheduler config file: %v", err) + tb.Fatalf("error loading scheduler config file: %v", err) } if err = validation.ValidateKubeSchedulerConfiguration(cfg); err != nil { - b.Fatalf("validate scheduler config file failed: %v", err) + tb.Fatalf("validate scheduler config file failed: %v", err) } } - informerFactory, client, dynClient := mustSetupCluster(ctx, b, cfg, tc.FeatureGates) + return mustSetupCluster(ctx, tb, cfg, featureGates) +} + +func runWorkload(ctx context.Context, tb testing.TB, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory, client clientset.Interface, dynClient dynamic.Interface, cleanup bool) []DataItem { + b, benchmarking := tb.(*testing.B) + if benchmarking { + start := time.Now() + b.Cleanup(func() { + duration := time.Since(start) + // This includes startup and shutdown time and thus does not + // reflect scheduling performance. It's useful to get a feeling + // for how long each workload runs overall. + b.ReportMetric(duration.Seconds(), "runtime_seconds") + }) + } + + // Disable error checking of the sampling interval length in the + // throughput collector by default. When running benchmarks, report + // it as test failure when samples are not taken regularly. + var throughputErrorMargin float64 + if benchmarking { + // TODO: To prevent the perf-test failure, we increased the error margin, if still not enough + // one day, we should think of another approach to avoid this trick. + throughputErrorMargin = 30 + } // Additional informers needed for testing. The pod informer was // already created before (scheduler.NewInformerFactory) and the @@ -820,45 +927,45 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c if cleanup { // This must run before controllers get shut down. - defer cleanupWorkload(ctx, b, tc, client, numPodsScheduledPerNamespace) + defer cleanupWorkload(ctx, tb, tc, client, numPodsScheduledPerNamespace) } - for opIndex, op := range unrollWorkloadTemplate(b, tc.WorkloadTemplate, w) { + for opIndex, op := range unrollWorkloadTemplate(tb, tc.WorkloadTemplate, w) { realOp, err := op.realOp.patchParams(w) if err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } select { case <-ctx.Done(): - b.Fatalf("op %d: %v", opIndex, ctx.Err()) + tb.Fatalf("op %d: %v", opIndex, ctx.Err()) default: } switch concreteOp := realOp.(type) { case *createNodesOp: nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, client) if err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } if err := nodePreparer.PrepareNodes(ctx, nextNodeIndex); err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } if cleanup { - b.Cleanup(func() { + defer func() { if err := nodePreparer.CleanupNodes(ctx); err != nil { - b.Fatalf("failed to clean up nodes, error: %v", err) + tb.Fatalf("failed to clean up nodes, error: %v", err) } - }) + }() } nextNodeIndex += concreteOp.Count case *createNamespacesOp: - nsPreparer, err := newNamespacePreparer(concreteOp, client, b) + nsPreparer, err := newNamespacePreparer(concreteOp, client, tb) if err != nil { - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } if err := nsPreparer.prepare(ctx); err != nil { nsPreparer.cleanup(ctx) - b.Fatalf("op %d: %v", opIndex, err) + tb.Fatalf("op %d: %v", opIndex, err) } for _, n := range nsPreparer.namespaces() { if _, ok := numPodsScheduledPerNamespace[n]; ok { @@ -875,7 +982,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c if concreteOp.Namespace != nil { namespace = *concreteOp.Namespace } - createNamespaceIfNotPresent(ctx, b, client, namespace, &numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace) if concreteOp.PodTemplatePath == nil { concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath } @@ -891,7 +998,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c if concreteOp.CollectMetrics { collectorCtx, collectorCancel = context.WithCancel(ctx) defer collectorCancel() - collectors = getTestDataCollectors(b, podInformer, fmt.Sprintf("%s/%s", b.Name(), namespace), namespace, tc.MetricsCollectorConfig) + collectors = getTestDataCollectors(tb, podInformer, fmt.Sprintf("%s/%s", tb.Name(), namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin) for _, collector := range collectors { // Need loop-local variable for function below. collector := collector @@ -902,8 +1009,8 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c }() } } - if err := createPods(ctx, b, namespace, concreteOp, client); err != nil { - b.Fatalf("op %d: %v", opIndex, err) + if err := createPods(ctx, tb, namespace, concreteOp, client); err != nil { + tb.Fatalf("op %d: %v", opIndex, err) } if concreteOp.SkipWaitToCompletion { // Only record those namespaces that may potentially require barriers @@ -914,8 +1021,8 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c numPodsScheduledPerNamespace[namespace] = concreteOp.Count } } else { - if err := waitUntilPodsScheduledInNamespace(ctx, b, podInformer, namespace, concreteOp.Count); err != nil { - b.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) + if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, concreteOp.Count); err != nil { + tb.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) } } if concreteOp.CollectMetrics { @@ -949,7 +1056,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c // Ensure the namespace exists. nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}} if _, err := client.CoreV1().Namespaces().Create(ctx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) { - b.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) + tb.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err) } var churnFns []func(name string) string @@ -957,12 +1064,12 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c for i, path := range concreteOp.TemplatePaths { unstructuredObj, gvk, err := getUnstructuredFromFile(path) if err != nil { - b.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) + tb.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err) } // Obtain GVR. mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { - b.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) + tb.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err) } gvr := mapping.Resource // Distinguish cluster-scoped with namespaced API objects. @@ -1043,11 +1150,11 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c case *barrierOp: for _, namespace := range concreteOp.Namespaces { if _, ok := numPodsScheduledPerNamespace[namespace]; !ok { - b.Fatalf("op %d: unknown namespace %s", opIndex, namespace) + tb.Fatalf("op %d: unknown namespace %s", opIndex, namespace) } } - if err := waitUntilPodsScheduled(ctx, b, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { - b.Fatalf("op %d: %v", opIndex, err) + if err := waitUntilPodsScheduled(ctx, tb, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { + tb.Fatalf("op %d: %v", opIndex, err) } // At the end of the barrier, we can be sure that there are no pods // pending scheduling in the namespaces that we just blocked on. @@ -1067,19 +1174,19 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c default: runable, ok := concreteOp.(runnableOp) if !ok { - b.Fatalf("op %d: invalid op %v", opIndex, concreteOp) + tb.Fatalf("op %d: invalid op %v", opIndex, concreteOp) } for _, namespace := range runable.requiredNamespaces() { - createNamespaceIfNotPresent(ctx, b, client, namespace, &numPodsScheduledPerNamespace) + createNamespaceIfNotPresent(ctx, tb, client, namespace, &numPodsScheduledPerNamespace) } - runable.run(ctx, b, client) + runable.run(ctx, tb, client) } } // check unused params and inform users unusedParams := w.unusedParams() if len(unusedParams) != 0 { - b.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name) + tb.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name) } // Some tests have unschedulable pods. Do not add an implicit barrier at the @@ -1151,13 +1258,13 @@ func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client cl }).WithTimeout(5*time.Minute).Should(gomega.BeEmpty(), "deleting namespaces") } -func createNamespaceIfNotPresent(ctx context.Context, b *testing.B, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) { +func createNamespaceIfNotPresent(ctx context.Context, tb testing.TB, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) { if _, ok := (*podsPerNamespace)[namespace]; !ok { // The namespace has not created yet. // So, create that and register it. _, err := client.CoreV1().Namespaces().Create(ctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{}) if err != nil { - b.Fatalf("failed to create namespace for Pod: %v", namespace) + tb.Fatalf("failed to create namespace for Pod: %v", namespace) } (*podsPerNamespace)[namespace] = 0 } @@ -1168,12 +1275,12 @@ type testDataCollector interface { collect() []DataItem } -func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig) []testDataCollector { +func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector { if mcc == nil { mcc = &defaultMetricsCollectorConfig } return []testDataCollector{ - newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}), + newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin), newMetricsCollector(mcc, map[string]string{"Name": name}), } } @@ -1206,12 +1313,12 @@ func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Inte ), nil } -func createPods(ctx context.Context, b *testing.B, namespace string, cpo *createPodsOp, clientset clientset.Interface) error { +func createPods(ctx context.Context, tb testing.TB, namespace string, cpo *createPodsOp, clientset clientset.Interface) error { strategy, err := getPodStrategy(cpo) if err != nil { return err } - b.Logf("creating %d pods in namespace %q", cpo.Count, namespace) + tb.Logf("creating %d pods in namespace %q", cpo.Count, namespace) config := testutils.NewTestPodCreatorConfig() config.AddStrategy(namespace, cpo.Count, strategy) podCreator := testutils.NewTestPodCreator(clientset, config) @@ -1221,7 +1328,7 @@ func createPods(ctx context.Context, b *testing.B, namespace string, cpo *create // waitUntilPodsScheduledInNamespace blocks until all pods in the given // namespace are scheduled. Times out after 10 minutes because even at the // lowest observed QPS of ~10 pods/sec, a 5000-node test should complete. -func waitUntilPodsScheduledInNamespace(ctx context.Context, b *testing.B, podInformer coreinformers.PodInformer, namespace string, wantCount int) error { +func waitUntilPodsScheduledInNamespace(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespace string, wantCount int) error { return wait.PollImmediate(1*time.Second, 10*time.Minute, func() (bool, error) { select { case <-ctx.Done(): @@ -1233,17 +1340,17 @@ func waitUntilPodsScheduledInNamespace(ctx context.Context, b *testing.B, podInf return false, err } if len(scheduled) >= wantCount { - b.Logf("scheduling succeed") + tb.Logf("scheduling succeed") return true, nil } - b.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled)) + tb.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled)) return false, nil }) } // waitUntilPodsScheduled blocks until the all pods in the given namespaces are // scheduled. -func waitUntilPodsScheduled(ctx context.Context, b *testing.B, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { +func waitUntilPodsScheduled(ctx context.Context, tb testing.TB, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { // If unspecified, default to all known namespaces. if len(namespaces) == 0 { for namespace := range numPodsScheduledPerNamespace { @@ -1260,7 +1367,7 @@ func waitUntilPodsScheduled(ctx context.Context, b *testing.B, podInformer corei if !ok { return fmt.Errorf("unknown namespace %s", namespace) } - if err := waitUntilPodsScheduledInNamespace(ctx, b, podInformer, namespace, wantCount); err != nil { + if err := waitUntilPodsScheduledInNamespace(ctx, tb, podInformer, namespace, wantCount); err != nil { return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err) } } @@ -1414,10 +1521,10 @@ type namespacePreparer struct { count int prefix string spec *v1.Namespace - t testing.TB + tb testing.TB } -func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface, b *testing.B) (*namespacePreparer, error) { +func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface, tb testing.TB) (*namespacePreparer, error) { ns := &v1.Namespace{} if cno.NamespaceTemplatePath != nil { if err := getSpecFromFile(cno.NamespaceTemplatePath, ns); err != nil { @@ -1430,7 +1537,7 @@ func newNamespacePreparer(cno *createNamespacesOp, clientset clientset.Interface count: cno.Count, prefix: cno.Prefix, spec: ns, - t: b, + tb: tb, }, nil } @@ -1449,7 +1556,7 @@ func (p *namespacePreparer) prepare(ctx context.Context) error { if p.spec != nil { base = p.spec } - p.t.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base) + p.tb.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base) for i := 0; i < p.count; i++ { n := base.DeepCopy() n.Name = fmt.Sprintf("%s-%d", p.prefix, i) @@ -1469,7 +1576,7 @@ func (p *namespacePreparer) cleanup(ctx context.Context) error { for i := 0; i < p.count; i++ { n := fmt.Sprintf("%s-%d", p.prefix, i) if err := p.client.CoreV1().Namespaces().Delete(ctx, n, metav1.DeleteOptions{}); err != nil { - p.t.Errorf("Deleting Namespace: %v", err) + p.tb.Errorf("Deleting Namespace: %v", err) errRet = err } } diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 91d6edc78ad..79bc3ab10bb 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -73,7 +73,7 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { return &cfg, nil } -// mustSetupScheduler starts the following components: +// mustSetupCluster starts the following components: // - k8s api server // - scheduler // - some of the kube-controller-manager controllers @@ -82,11 +82,11 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { +func mustSetupCluster(ctx context.Context, tb testing.TB, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { // Run API server with minimimal logging by default. Can be raised with -v. framework.MinVerbosity = 0 - _, kubeConfig, tearDownFn := framework.StartTestServer(ctx, b, framework.TestServerSetup{ + _, kubeConfig, tearDownFn := framework.StartTestServer(ctx, tb, framework.TestServerSetup{ ModifyServerRunOptions: func(opts *options.ServerRunOptions) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"} @@ -99,12 +99,12 @@ func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSche } }, }) - b.Cleanup(tearDownFn) + tb.Cleanup(tearDownFn) // Cleanup will be in reverse order: first the clients get cancelled, // then the apiserver is torn down. ctx, cancel := context.WithCancel(ctx) - b.Cleanup(cancel) + tb.Cleanup(cancel) // TODO: client connection configuration, such as QPS or Burst is configurable in theory, this could be derived from the `config`, need to // support this when there is any testcase that depends on such configuration. @@ -117,7 +117,7 @@ func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSche var err error config, err = newDefaultComponentConfig() if err != nil { - b.Fatalf("Error creating default component config: %v", err) + tb.Fatalf("Error creating default component config: %v", err) } } @@ -128,14 +128,14 @@ func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSche // be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`. _, informerFactory := util.StartScheduler(ctx, client, cfg, config) util.StartFakePVController(ctx, client, informerFactory) - runGC := util.CreateGCController(ctx, b, *cfg, informerFactory) - runNS := util.CreateNamespaceController(ctx, b, *cfg, informerFactory) + runGC := util.CreateGCController(ctx, tb, *cfg, informerFactory) + runNS := util.CreateNamespaceController(ctx, tb, *cfg, informerFactory) runResourceClaimController := func() {} if enabledFeatures[features.DynamicResourceAllocation] { // Testing of DRA with inline resource claims depends on this // controller for creating and removing ResourceClaims. - runResourceClaimController = util.CreateResourceClaimController(ctx, b, client, informerFactory) + runResourceClaimController = util.CreateResourceClaimController(ctx, tb, client, informerFactory) } informerFactory.Start(ctx.Done()) @@ -320,14 +320,16 @@ type throughputCollector struct { schedulingThroughputs []float64 labels map[string]string namespaces []string + errorMargin float64 } -func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string) *throughputCollector { +func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector { return &throughputCollector{ tb: tb, podInformer: podInformer, labels: labels, namespaces: namespaces, + errorMargin: errorMargin, } } @@ -388,9 +390,7 @@ func (tc *throughputCollector) run(ctx context.Context) { throughput := float64(newScheduled) / durationInSeconds expectedDuration := throughputSampleInterval * time.Duration(skipped+1) errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100 - // TODO: To prevent the perf-test failure, we increased the error margin, if still not enough - // one day, we should think of another approach to avoid this trick. - if math.Abs(errorMargin) > 30 { + if tc.errorMargin > 0 && math.Abs(errorMargin) > tc.errorMargin { // This might affect the result, report it. tc.tb.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin) } From 0d41d509d2d96ccc3473924cb4e1b8e1b3e4c170 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 27 Jun 2023 15:21:32 +0200 Subject: [PATCH 6/6] scheduler_perf: replace gomega.Eventually with wait.PollUntilContextTimeout This is done for the sake of consistency. The failure message becomes less useful. --- .../scheduler_perf/scheduler_perf_test.go | 41 +++++-------------- 1 file changed, 10 insertions(+), 31 deletions(-) diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index 619d46bc39a..ecbfce6887b 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -31,7 +31,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" @@ -51,7 +50,6 @@ import ( "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-base/metrics/legacyregistry" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" @@ -1221,41 +1219,22 @@ func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client cl // actually removing a namespace can take some time (garbage collecting // other generated object like secrets, etc.) and we don't want to // start the next workloads while that cleanup is still going on. - gomega.NewGomegaWithT(tb).Eventually(ctx, func(ctx context.Context) ([]interface{}, error) { - var objects []interface{} + if err := wait.PollUntilContextTimeout(ctx, time.Second, 5*time.Minute, false, func(ctx context.Context) (bool, error) { namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) if err != nil { - return nil, err + return false, err } - // Collecting several objects of interest (pods, claims) is done to - // provide a more informative failure message when a namespace doesn't - // disappear quickly enough. for _, namespace := range namespaces.Items { - if _, ok := numPodsScheduledPerNamespace[namespace.Name]; !ok { - // Not a namespace created by the workload. - continue + if _, ok := numPodsScheduledPerNamespace[namespace.Name]; ok { + // A namespace created by the workload, need to wait. + return false, nil } - pods, err := client.CoreV1().Pods(namespace.Name).List(ctx, metav1.ListOptions{}) - if err != nil { - return nil, err - } - if len(pods.Items) > 0 { - // Record one pod per namespace - that's usually enough information. - objects = append(objects, pods.Items[0]) - } - if tc.FeatureGates[features.DynamicResourceAllocation] { - claims, err := client.ResourceV1alpha2().ResourceClaims(namespace.Name).List(ctx, metav1.ListOptions{}) - if err != nil { - return nil, err - } - if len(claims.Items) > 0 { - objects = append(objects, claims.Items[0]) - } - } - objects = append(objects, namespace) } - return objects, nil - }).WithTimeout(5*time.Minute).Should(gomega.BeEmpty(), "deleting namespaces") + // All namespaces gone. + return true, nil + }); err != nil { + tb.Fatalf("failed while waiting for namespace removal: %v", err) + } } func createNamespaceIfNotPresent(ctx context.Context, tb testing.TB, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) {