From 3d0894fabf96013bf5871a59f01f433a76361edc Mon Sep 17 00:00:00 2001 From: Kante Yin Date: Tue, 31 Jan 2023 08:21:00 +0800 Subject: [PATCH] Fix failure(context canceled) in scheduler_perf benchmark (#114843) * Fix failure in scheduler_perf benchmark Signed-off-by: Kante Yin * Fatal when error in cleaning up nodes in scheduler perf tests Signed-off-by: Kante Yin * Use derived context to better organize the codes Signed-off-by: Kante Yin * Change log level to 2 in scheduler perf-test Signed-off-by: Kante Yin --------- Signed-off-by: Kante Yin --- test/integration/scheduler_perf/main_test.go | 7 ++++ .../scheduler_perf/scheduler_perf_test.go | 32 +++++++++++-------- test/integration/scheduler_perf/util.go | 10 +++--- test/integration/util/util.go | 17 ++-------- 4 files changed, 33 insertions(+), 33 deletions(-) diff --git a/test/integration/scheduler_perf/main_test.go b/test/integration/scheduler_perf/main_test.go index 16275396eba..09f3a87136f 100644 --- a/test/integration/scheduler_perf/main_test.go +++ b/test/integration/scheduler_perf/main_test.go @@ -17,11 +17,18 @@ limitations under the License. package benchmark import ( + "flag" "testing" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/test/integration/framework" ) func TestMain(m *testing.M) { + // Run with -v=2, this is the default log level in production. + ktesting.DefaultConfig = ktesting.NewConfig(ktesting.Verbosity(2)) + ktesting.DefaultConfig.AddFlags(flag.CommandLine) + flag.Parse() + framework.EtcdMain(m.Run) } diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index 0901e870095..2eb1b29c76f 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -584,10 +584,14 @@ func BenchmarkPerfScheduling(b *testing.B) { b.Run(tc.Name, func(b *testing.B) { for _, w := range tc.Workloads { b.Run(w.Name, func(b *testing.B) { + // 30 minutes should be plenty enough even for the 5000-node tests. + ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Minute) + b.Cleanup(cancel) + for feature, flag := range tc.FeatureGates { defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)() } - dataItems.DataItems = append(dataItems.DataItems, runWorkload(b, tc, w)...) + dataItems.DataItems = append(dataItems.DataItems, runWorkload(ctx, b, tc, w)...) // Reset metrics to prevent metrics generated in current workload gets // carried over to the next workload. legacyregistry.Reset() @@ -639,10 +643,7 @@ func unrollWorkloadTemplate(b *testing.B, wt []op, w *workload) []op { return unrolled } -func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem { - // 30 minutes should be plenty enough even for the 5000-node tests. - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) - defer cancel() +func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) []DataItem { var cfg *config.KubeSchedulerConfiguration var err error if tc.SchedulerConfigPath != nil { @@ -654,7 +655,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem { b.Fatalf("validate scheduler config file failed: %v", err) } } - finalFunc, podInformer, client, dynClient := mustSetupScheduler(b, cfg) + finalFunc, podInformer, client, dynClient := mustSetupScheduler(ctx, b, cfg) b.Cleanup(finalFunc) var mu sync.Mutex @@ -665,7 +666,7 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem { numPodsScheduledPerNamespace := make(map[string]int) b.Cleanup(func() { for namespace := range numPodsScheduledPerNamespace { - if err := client.CoreV1().Namespaces().Delete(context.Background(), namespace, metav1.DeleteOptions{}); err != nil { + if err := client.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{}); err != nil { b.Errorf("Deleting Namespace in numPodsScheduledPerNamespace: %v", err) } } @@ -691,7 +692,9 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem { b.Fatalf("op %d: %v", opIndex, err) } b.Cleanup(func() { - _ = nodePreparer.CleanupNodes(ctx) + if err := nodePreparer.CleanupNodes(ctx); err != nil { + b.Fatalf("failed to clean up nodes, error: %v", err) + } }) nextNodeIndex += concreteOp.Count @@ -700,8 +703,8 @@ func runWorkload(b *testing.B, tc *testCase, w *workload) []DataItem { if err != nil { b.Fatalf("op %d: %v", opIndex, err) } - if err := nsPreparer.prepare(); err != nil { - nsPreparer.cleanup() + if err := nsPreparer.prepare(ctx); err != nil { + nsPreparer.cleanup(ctx) b.Fatalf("op %d: %v", opIndex, err) } for _, n := range nsPreparer.namespaces() { @@ -985,6 +988,7 @@ func waitUntilPodsScheduledInNamespace(ctx context.Context, b *testing.B, podInf return false, err } if len(scheduled) >= wantCount { + b.Logf("scheduling succeed") return true, nil } b.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled)) @@ -1195,7 +1199,7 @@ func (p *namespacePreparer) namespaces() []string { } // prepare creates the namespaces. -func (p *namespacePreparer) prepare() error { +func (p *namespacePreparer) prepare(ctx context.Context) error { base := &v1.Namespace{} if p.spec != nil { base = p.spec @@ -1205,7 +1209,7 @@ func (p *namespacePreparer) prepare() error { n := base.DeepCopy() n.Name = fmt.Sprintf("%s-%d", p.prefix, i) if err := testutils.RetryWithExponentialBackOff(func() (bool, error) { - _, err := p.client.CoreV1().Namespaces().Create(context.Background(), n, metav1.CreateOptions{}) + _, err := p.client.CoreV1().Namespaces().Create(ctx, n, metav1.CreateOptions{}) return err == nil || apierrors.IsAlreadyExists(err), nil }); err != nil { return err @@ -1215,11 +1219,11 @@ func (p *namespacePreparer) prepare() error { } // cleanup deletes existing test namespaces. -func (p *namespacePreparer) cleanup() error { +func (p *namespacePreparer) cleanup(ctx context.Context) error { var errRet error for i := 0; i < p.count; i++ { n := fmt.Sprintf("%s-%d", p.prefix, i) - if err := p.client.CoreV1().Namespaces().Delete(context.Background(), n, metav1.DeleteOptions{}); err != nil { + if err := p.client.CoreV1().Namespaces().Delete(ctx, n, metav1.DeleteOptions{}); err != nil { p.t.Errorf("Deleting Namespace: %v", err) errRet = err } diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index b847a68fd1f..73db210fa5f 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -75,7 +75,8 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupScheduler(b *testing.B, config *config.KubeSchedulerConfiguration) (util.ShutdownFunc, coreinformers.PodInformer, clientset.Interface, dynamic.Interface) { +func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration) (util.ShutdownFunc, coreinformers.PodInformer, clientset.Interface, dynamic.Interface) { + ctx, cancel := context.WithCancel(ctx) // Run API server with minimimal logging by default. Can be raised with -v. framework.MinVerbosity = 0 @@ -106,12 +107,11 @@ func mustSetupScheduler(b *testing.B, config *config.KubeSchedulerConfiguration) // Not all config options will be effective but only those mostly related with scheduler performance will // be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`. - _, podInformer, schedulerShutdown := util.StartScheduler(client, cfg, config) - fakePVControllerShutdown := util.StartFakePVController(client) + _, podInformer := util.StartScheduler(ctx, client, cfg, config) + util.StartFakePVController(ctx, client) shutdownFn := func() { - fakePVControllerShutdown() - schedulerShutdown() + cancel() tearDownFn() } diff --git a/test/integration/util/util.go b/test/integration/util/util.go index ae2fa61b8e7..46750857e7b 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -67,9 +67,7 @@ type ShutdownFunc func() // StartScheduler configures and starts a scheduler given a handle to the clientSet interface // and event broadcaster. It returns the running scheduler, podInformer and the shutdown function to stop it. -func StartScheduler(clientSet clientset.Interface, kubeConfig *restclient.Config, cfg *kubeschedulerconfig.KubeSchedulerConfiguration) (*scheduler.Scheduler, coreinformers.PodInformer, ShutdownFunc) { - ctx, cancel := context.WithCancel(context.Background()) - +func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConfig *restclient.Config, cfg *kubeschedulerconfig.KubeSchedulerConfiguration) (*scheduler.Scheduler, coreinformers.PodInformer) { informerFactory := scheduler.NewInformerFactory(clientSet, 0) evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ Interface: clientSet.EventsV1()}) @@ -97,24 +95,16 @@ func StartScheduler(clientSet clientset.Interface, kubeConfig *restclient.Config informerFactory.WaitForCacheSync(ctx.Done()) go sched.Run(ctx) - shutdownFunc := func() { - klog.Infof("destroying scheduler") - cancel() - klog.Infof("destroyed scheduler") - } - return sched, informerFactory.Core().V1().Pods(), shutdownFunc + return sched, informerFactory.Core().V1().Pods() } // StartFakePVController is a simplified pv controller logic that sets PVC VolumeName and annotation for each PV binding. // TODO(mborsz): Use a real PV controller here. -func StartFakePVController(clientSet clientset.Interface) ShutdownFunc { - ctx, cancel := context.WithCancel(context.Background()) - +func StartFakePVController(ctx context.Context, clientSet clientset.Interface) { informerFactory := informers.NewSharedInformerFactory(clientSet, 0) pvInformer := informerFactory.Core().V1().PersistentVolumes() syncPV := func(obj *v1.PersistentVolume) { - ctx := context.Background() if obj.Spec.ClaimRef != nil { claimRef := obj.Spec.ClaimRef pvc, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Get(ctx, claimRef.Name, metav1.GetOptions{}) @@ -145,7 +135,6 @@ func StartFakePVController(clientSet clientset.Interface) ShutdownFunc { }) informerFactory.Start(ctx.Done()) - return ShutdownFunc(cancel) } // TestContext store necessary context info