From dfd646e0a89d21d303422a51aeba0364b0551da9 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 23 Mar 2023 16:59:30 +0100 Subject: [PATCH] scheduler_perf: fix namespace deletion Merely deleting the namespace is not enough: - Workloads might rely on the garbage collector to get rid of obsolete objects, so we should run it to be on the safe side. - Pods must be force-deleted because kubelet is not running. - Finally, the namespace controller is needed to get rid of deleted namespaces. --- test/integration/job/job_test.go | 43 +--------- .../scheduler_perf/scheduler_perf_test.go | 80 ++++++++++++++++--- test/integration/scheduler_perf/util.go | 8 +- test/integration/util/util.go | 66 +++++++++++++++ 4 files changed, 146 insertions(+), 51 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index f1e8435e09b..215c369a082 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -41,27 +41,22 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/util/feature" - cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1" - "k8s.io/client-go/metadata" - "k8s.io/client-go/metadata/metadatainformer" restclient "k8s.io/client-go/rest" - "k8s.io/client-go/restmapper" "k8s.io/client-go/util/retry" featuregatetesting "k8s.io/component-base/featuregate/testing" basemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/testutil" - "k8s.io/controller-manager/pkg/informerfactory" "k8s.io/klog/v2" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/controller/garbagecollector" jobcontroller "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/kubernetes/test/integration/util" "k8s.io/utils/pointer" ) @@ -1313,7 +1308,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { defer cancel() restConfig.QPS = 200 restConfig.Burst = 200 - runGC := createGC(ctx, t, restConfig, informerSet) + runGC := util.CreateGCController(ctx, t, *restConfig, informerSet) informerSet.Start(ctx.Done()) go jc.Run(ctx, 1) runGC() @@ -2092,40 +2087,6 @@ func createJobControllerWithSharedInformers(restConfig *restclient.Config, infor return jc, ctx, cancel } -func createGC(ctx context.Context, t *testing.T, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) func() { - restConfig = restclient.AddUserAgent(restConfig, "gc-controller") - clientSet := clientset.NewForConfigOrDie(restConfig) - metadataClient, err := metadata.NewForConfig(restConfig) - if err != nil { - t.Fatalf("Failed to create metadataClient: %v", err) - } - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery())) - restMapper.Reset() - metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0) - alwaysStarted := make(chan struct{}) - close(alwaysStarted) - gc, err := garbagecollector.NewGarbageCollector( - clientSet, - metadataClient, - restMapper, - garbagecollector.DefaultIgnoredResources(), - informerfactory.NewInformerFactory(informerSet, metadataInformers), - alwaysStarted, - ) - if err != nil { - t.Fatalf("Failed creating garbage collector") - } - startGC := func() { - syncPeriod := 5 * time.Second - go wait.Until(func() { - restMapper.Reset() - }, syncPeriod, ctx.Done()) - go gc.Run(ctx, 1) - go gc.Sync(ctx, clientSet.Discovery(), syncPeriod) - } - return startGC -} - func hasJobTrackingFinalizer(obj metav1.Object) bool { for _, fin := range obj.GetFinalizers() { if fin == batchv1.JobTrackingFinalizer { diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index ba051aa8c9c..bd02232eeee 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -30,6 +30,8 @@ import ( "testing" "time" + "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -47,6 +49,7 @@ import ( "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" @@ -794,11 +797,11 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c b.Fatalf("validate scheduler config file failed: %v", err) } } - informerFactory, client, dynClient := mustSetupScheduler(ctx, b, cfg, tc.FeatureGates) + informerFactory, client, dynClient := mustSetupCluster(ctx, b, cfg, tc.FeatureGates) // Additional informers needed for testing. The pod informer was // already created before (scheduler.NewInformerFactory) and the - // factory was started for it (mustSetupScheduler), therefore we don't + // factory was started for it (mustSetupCluster), therefore we don't // need to start again. podInformer := informerFactory.Core().V1().Pods() @@ -816,13 +819,8 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c numPodsScheduledPerNamespace := make(map[string]int) if cleanup { - b.Cleanup(func() { - for namespace := range numPodsScheduledPerNamespace { - if err := client.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{}); err != nil { - b.Errorf("Deleting Namespace in numPodsScheduledPerNamespace: %v", err) - } - } - }) + // This must run before controllers get shut down. + defer cleanupWorkload(ctx, b, tc, client, numPodsScheduledPerNamespace) } for opIndex, op := range unrollWorkloadTemplate(b, tc.WorkloadTemplate, w) { @@ -1089,6 +1087,70 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c return dataItems } +// cleanupWorkload ensures that everything is removed from the API server that +// might have been created by runWorkload. This must be done before starting +// the next workload because otherwise it might stumble over previously created +// objects. For example, the namespaces are the same in different workloads, so +// not deleting them would cause the next one to fail with "cannot create +// namespace: already exists". +// +// Calling cleanupWorkload can be skipped if it is known that the next workload +// will run with a fresh etcd instance. +func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client clientset.Interface, numPodsScheduledPerNamespace map[string]int) { + deleteNow := *metav1.NewDeleteOptions(0) + for namespace := range numPodsScheduledPerNamespace { + // Pods have to be deleted explicitly, with no grace period. Normally + // kubelet will set the DeletionGracePeriodSeconds to zero when it's okay + // to remove a deleted pod, but we don't run kubelet... + if err := client.CoreV1().Pods(namespace).DeleteCollection(ctx, deleteNow, metav1.ListOptions{}); err != nil { + tb.Fatalf("failed to delete pods in namespace %q: %v", namespace, err) + } + if err := client.CoreV1().Namespaces().Delete(ctx, namespace, deleteNow); err != nil { + tb.Fatalf("Deleting Namespace %q in numPodsScheduledPerNamespace: %v", namespace, err) + } + } + + // We need to wait here because even with deletion timestamp set, + // actually removing a namespace can take some time (garbage collecting + // other generated object like secrets, etc.) and we don't want to + // start the next workloads while that cleanup is still going on. + gomega.NewGomegaWithT(tb).Eventually(ctx, func(ctx context.Context) ([]interface{}, error) { + var objects []interface{} + namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + // Collecting several objects of interest (pods, claims) is done to + // provide a more informative failure message when a namespace doesn't + // disappear quickly enough. + for _, namespace := range namespaces.Items { + if _, ok := numPodsScheduledPerNamespace[namespace.Name]; !ok { + // Not a namespace created by the workload. + continue + } + pods, err := client.CoreV1().Pods(namespace.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + if len(pods.Items) > 0 { + // Record one pod per namespace - that's usually enough information. + objects = append(objects, pods.Items[0]) + } + if tc.FeatureGates[features.DynamicResourceAllocation] { + claims, err := client.ResourceV1alpha2().ResourceClaims(namespace.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + if len(claims.Items) > 0 { + objects = append(objects, claims.Items[0]) + } + } + objects = append(objects, namespace) + } + return objects, nil + }).WithTimeout(5*time.Minute).Should(gomega.BeEmpty(), "deleting namespaces") +} + func createNamespaceIfNotPresent(ctx context.Context, b *testing.B, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) { if _, ok := (*podsPerNamespace)[namespace]; !ok { // The namespace has not created yet. diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index e9fb5273be8..91d6edc78ad 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -76,11 +76,13 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { // mustSetupScheduler starts the following components: // - k8s api server // - scheduler +// - some of the kube-controller-manager controllers +// // It returns regular and dynamic clients, and destroyFunc which should be used to // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { +func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) { // Run API server with minimimal logging by default. Can be raised with -v. framework.MinVerbosity = 0 @@ -126,6 +128,8 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc // be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`. _, informerFactory := util.StartScheduler(ctx, client, cfg, config) util.StartFakePVController(ctx, client, informerFactory) + runGC := util.CreateGCController(ctx, b, *cfg, informerFactory) + runNS := util.CreateNamespaceController(ctx, b, *cfg, informerFactory) runResourceClaimController := func() {} if enabledFeatures[features.DynamicResourceAllocation] { @@ -136,6 +140,8 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) + go runGC() + go runNS() go runResourceClaimController() return informerFactory, client, dynClient diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 4a0451d326d..0875350d3c5 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -36,17 +36,22 @@ import ( "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatainformer" restclient "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/scale" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" pvutil "k8s.io/component-helpers/storage/volume" + "k8s.io/controller-manager/pkg/informerfactory" "k8s.io/klog/v2" "k8s.io/kube-scheduler/config/v1beta3" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/disruption" + "k8s.io/kubernetes/pkg/controller/garbagecollector" + "k8s.io/kubernetes/pkg/controller/namespace" "k8s.io/kubernetes/pkg/controller/resourceclaim" "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/scheduler" @@ -162,6 +167,67 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface, i }) } +// CreateGCController creates a garbage controller and returns a run function +// for it. The informer factory needs to be started before invoking that +// function. +func CreateGCController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() { + restclient.AddUserAgent(&restConfig, "gc-controller") + clientSet := clientset.NewForConfigOrDie(&restConfig) + metadataClient, err := metadata.NewForConfig(&restConfig) + if err != nil { + tb.Fatalf("Failed to create metadataClient: %v", err) + } + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery())) + restMapper.Reset() + metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0) + alwaysStarted := make(chan struct{}) + close(alwaysStarted) + gc, err := garbagecollector.NewGarbageCollector( + clientSet, + metadataClient, + restMapper, + garbagecollector.DefaultIgnoredResources(), + informerfactory.NewInformerFactory(informerSet, metadataInformers), + alwaysStarted, + ) + if err != nil { + tb.Fatalf("Failed creating garbage collector") + } + startGC := func() { + syncPeriod := 5 * time.Second + go wait.Until(func() { + restMapper.Reset() + }, syncPeriod, ctx.Done()) + go gc.Run(ctx, 1) + go gc.Sync(ctx, clientSet.Discovery(), syncPeriod) + } + return startGC +} + +// CreateNamespaceController creates a namespace controller and returns a run +// function for it. The informer factory needs to be started before invoking +// that function. +func CreateNamespaceController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() { + restclient.AddUserAgent(&restConfig, "namespace-controller") + clientSet := clientset.NewForConfigOrDie(&restConfig) + metadataClient, err := metadata.NewForConfig(&restConfig) + if err != nil { + tb.Fatalf("Failed to create metadataClient: %v", err) + } + discoverResourcesFn := clientSet.Discovery().ServerPreferredNamespacedResources + controller := namespace.NewNamespaceController( + ctx, + clientSet, + metadataClient, + discoverResourcesFn, + informerSet.Core().V1().Namespaces(), + 10*time.Hour, + v1.FinalizerKubernetes) + return func() { + go controller.Run(ctx, 5) + } +} + // TestContext store necessary context info type TestContext struct { NS *v1.Namespace