From 168e016947c02b1a76e7716246afaaca27cfead4 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 23 Oct 2023 11:46:19 +0200 Subject: [PATCH] Benchmark job with backoff limit per index --- test/integration/job/job_test.go | 202 +++++++++++++++++++++++++------ 1 file changed, 166 insertions(+), 36 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index de2339eb001..4067c82b994 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -2097,25 +2097,53 @@ func BenchmarkLargeIndexedJob(b *testing.B) { Steps: 30, Cap: 5 * time.Minute, } + cases := map[string]struct { + nPods int32 + backoffLimitPerIndex *int32 + }{ + "regular indexed job without failures; size=10": { + nPods: 10, + }, + "job with backoffLimitPerIndex without failures; size=10": { + nPods: 10, + backoffLimitPerIndex: ptr.To[int32](1), + }, + "regular indexed job without failures; size=100": { + nPods: 100, + }, + "job with backoffLimitPerIndex without failures; size=100": { + nPods: 100, + backoffLimitPerIndex: ptr.To[int32](1), + }, + } mode := batchv1.IndexedCompletion - for _, nPods := range []int32{1000, 10_000} { - b.Run(fmt.Sprintf("nPods=%d", nPods), func(b *testing.B) { + for name, tc := range cases { + b.Run(name, func(b *testing.B) { + enableJobBackoffLimitPerIndex := tc.backoffLimitPerIndex != nil + defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, enableJobBackoffLimitPerIndex)() b.ResetTimer() for n := 0; n < b.N; n++ { + b.StartTimer() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("npods-%d-%d", nPods, n), + Name: fmt.Sprintf("npods-%d-%d-%v", tc.nPods, n, enableJobBackoffLimitPerIndex), }, Spec: batchv1.JobSpec{ - Parallelism: ptr.To(nPods), - Completions: ptr.To(nPods), - CompletionMode: &mode, + Parallelism: ptr.To(tc.nPods), + Completions: ptr.To(tc.nPods), + CompletionMode: &mode, + BackoffLimitPerIndex: tc.backoffLimitPerIndex, }, }) if err != nil { b.Fatalf("Failed to create Job: %v", err) } - remaining := int(nPods) + b.Cleanup(func() { + if err := cleanUp(ctx, clientSet, jobObj); err != nil { + b.Fatalf("Failed cleanup: %v", err) + } + }) + remaining := int(tc.nPods) if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) { if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil { remaining -= succ @@ -2127,38 +2155,134 @@ func BenchmarkLargeIndexedJob(b *testing.B) { b.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err) } validateJobSucceeded(ctx, b, clientSet, jobObj) - - // Cleanup Pods and Job. b.StopTimer() - // Clean up pods in pages, because DeleteCollection might timeout. - // #90743 - for { - pods, err := clientSet.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{Limit: 1}) - if err != nil { - b.Fatalf("Failed to list Pods for cleanup: %v", err) - } - if len(pods.Items) == 0 { - break - } - err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx, - metav1.DeleteOptions{}, - metav1.ListOptions{ - Limit: 1000, - }) - if err != nil { - b.Fatalf("Failed to cleanup Pods: %v", err) - } - } - err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{}) - if err != nil { - b.Fatalf("Failed to cleanup Job: %v", err) - } - b.StartTimer() } }) } } +// BenchmarkLargeFailureHandling benchmarks the handling of numerous pod failures +// of an Indexed Job. We set minimal backoff delay to make the job controller +// performance comparable for indexed jobs with global backoffLimit, and those +// with backoffLimit per-index, despite different patterns of handling failures. +func BenchmarkLargeFailureHandling(b *testing.B) { + b.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) + b.Cleanup(setDurationDuringTest(&jobcontroller.MaxJobPodFailureBackOff, fastPodFailureBackoff)) + closeFn, restConfig, clientSet, ns := setup(b, "indexed") + restConfig.QPS = 100 + restConfig.Burst = 100 + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(b, restConfig) + defer cancel() + backoff := wait.Backoff{ + Duration: time.Second, + Factor: 1.5, + Steps: 30, + Cap: 5 * time.Minute, + } + cases := map[string]struct { + nPods int32 + backoffLimitPerIndex *int32 + customTimeout *time.Duration + }{ + "regular indexed job with failures; size=10": { + nPods: 10, + }, + "job with backoffLimitPerIndex with failures; size=10": { + nPods: 10, + backoffLimitPerIndex: ptr.To[int32](1), + }, + "regular indexed job with failures; size=100": { + nPods: 100, + }, + "job with backoffLimitPerIndex with failures; size=100": { + nPods: 100, + backoffLimitPerIndex: ptr.To[int32](1), + }, + } + mode := batchv1.IndexedCompletion + for name, tc := range cases { + b.Run(name, func(b *testing.B) { + enableJobBackoffLimitPerIndex := tc.backoffLimitPerIndex != nil + timeout := ptr.Deref(tc.customTimeout, wait.ForeverTestTimeout) + defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, enableJobBackoffLimitPerIndex)() + b.ResetTimer() + for n := 0; n < b.N; n++ { + b.StopTimer() + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("npods-%d-%d-%v", tc.nPods, n, enableJobBackoffLimitPerIndex), + }, + Spec: batchv1.JobSpec{ + Parallelism: ptr.To(tc.nPods), + Completions: ptr.To(tc.nPods), + CompletionMode: &mode, + BackoffLimitPerIndex: tc.backoffLimitPerIndex, + BackoffLimit: ptr.To(tc.nPods), + }, + }) + if err != nil { + b.Fatalf("Failed to create Job: %v", err) + } + b.Cleanup(func() { + if err := cleanUp(ctx, clientSet, jobObj); err != nil { + b.Fatalf("Failed cleanup: %v", err) + } + }) + validateJobsPodsStatusOnlyWithTimeout(ctx, b, clientSet, jobObj, podsByStatus{ + Active: int(tc.nPods), + Ready: ptr.To[int32](0), + Terminating: ptr.To[int32](0), + }, timeout) + + b.StartTimer() + remaining := int(tc.nPods) + if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) { + if err, fail := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, remaining); err != nil { + remaining -= fail + b.Logf("Transient failure failing pods: %v", err) + return false, nil + } + return true, nil + }); err != nil { + b.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err) + } + validateJobsPodsStatusOnlyWithTimeout(ctx, b, clientSet, jobObj, podsByStatus{ + Active: int(tc.nPods), + Ready: ptr.To[int32](0), + Failed: int(tc.nPods), + Terminating: ptr.To[int32](0), + }, timeout) + b.StopTimer() + } + }) + } +} + +// cleanUp deletes all pods and the job +func cleanUp(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job) error { + // Clean up pods in pages, because DeleteCollection might timeout. + // #90743 + for { + pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{Limit: 1}) + if err != nil { + return err + } + if len(pods.Items) == 0 { + break + } + err = clientSet.CoreV1().Pods(jobObj.Namespace).DeleteCollection(ctx, + metav1.DeleteOptions{}, + metav1.ListOptions{ + Limit: 1000, + }) + if err != nil { + return err + } + } + return clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{}) +} + func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground, metav1.DeletePropagationForeground} { t.Run(string(policy), func(t *testing.T) { @@ -2617,10 +2741,15 @@ type podsByStatus struct { Terminating *int32 } -func validateJobsPodsStatusOnly(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) { +func validateJobsPodsStatusOnly(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) { + t.Helper() + validateJobsPodsStatusOnlyWithTimeout(ctx, t, clientSet, jobObj, desired, wait.ForeverTestTimeout) +} + +func validateJobsPodsStatusOnlyWithTimeout(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus, timeout time.Duration) { t.Helper() var actualCounts podsByStatus - if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, waitInterval, timeout, true, func(ctx context.Context) (bool, error) { updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Failed to get updated Job: %v", err) @@ -2638,7 +2767,8 @@ func validateJobsPodsStatusOnly(ctx context.Context, t *testing.T, clientSet cli t.Errorf("Waiting for Job Status: %v\nPods (-want,+got):\n%s", err, diff) } } -func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) { + +func validateJobPodsStatus(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) { t.Helper() validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired) var active []*v1.Pod