diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 9ab4678ee71..2afc3fb6da6 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -1230,7 +1230,7 @@ func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKe } if !apierrors.IsNotFound(err) { errCh <- err - utilruntime.HandleError(err) + utilruntime.HandleError(fmt.Errorf("removing tracking finalizer: %w", err)) return } } diff --git a/test/integration/framework/util.go b/test/integration/framework/util.go index 4bdbb4b6bcc..0fb7731b612 100644 --- a/test/integration/framework/util.go +++ b/test/integration/framework/util.go @@ -45,7 +45,7 @@ const ( ) // CreateNamespaceOrDie creates a namespace. -func CreateNamespaceOrDie(c clientset.Interface, baseName string, t *testing.T) *v1.Namespace { +func CreateNamespaceOrDie(c clientset.Interface, baseName string, t testing.TB) *v1.Namespace { ns := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: baseName}} result, err := c.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{}) if err != nil { @@ -55,7 +55,7 @@ func CreateNamespaceOrDie(c clientset.Interface, baseName string, t *testing.T) } // DeleteNamespaceOrDie deletes a namespace. -func DeleteNamespaceOrDie(c clientset.Interface, ns *v1.Namespace, t *testing.T) { +func DeleteNamespaceOrDie(c clientset.Interface, ns *v1.Namespace, t testing.TB) { err := c.CoreV1().Namespaces().Delete(context.TODO(), ns.Name, metav1.DeleteOptions{}) if err != nil { t.Fatalf("Failed to delete namespace: %v", err) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 89fcf711036..c8f6475cae2 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -437,7 +437,7 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi failedPod := jobPods[failedIndex] updatedPod := failedPod.DeepCopy() updatedPod.Status = podStatusMatchingOnExitCodesTerminateRule - err, _ = updatePodStatuses(ctx, cs, []v1.Pod{*updatedPod}) + _, err = updatePodStatuses(ctx, cs, []v1.Pod{*updatedPod}) if err != nil { t.Fatalf("Failed to update pod statuses %q for pods of job %q", err, klog.KObj(jobObj)) } @@ -1084,6 +1084,85 @@ func TestIndexedJob(t *testing.T) { } } +// BenchmarkLargeIndexedJob benchmarks the completion of an Indexed Job. +// We expect that large jobs are more commonly used as Indexed. And they are +// also faster to track, as they need less API calls. +func BenchmarkLargeIndexedJob(b *testing.B) { + defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)() + closeFn, restConfig, clientSet, ns := setup(b, "indexed") + restConfig.QPS = 100 + restConfig.Burst = 100 + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + defer cancel() + backoff := wait.Backoff{ + Duration: time.Second, + Factor: 1.5, + Steps: 30, + Cap: 5 * time.Minute, + } + mode := batchv1.IndexedCompletion + for _, nPods := range []int32{1000, 10_000} { + b.Run(fmt.Sprintf("nPods=%d", nPods), func(b *testing.B) { + b.ResetTimer() + for n := 0; n < b.N; n++ { + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("npods-%d-%d", nPods, n), + }, + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(nPods), + Completions: pointer.Int32Ptr(nPods), + CompletionMode: &mode, + }, + }) + if err != nil { + b.Fatalf("Failed to create Job: %v", err) + } + remaining := int(nPods) + if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) { + if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil { + remaining -= succ + b.Logf("Transient failure succeeding pods: %v", err) + return false, nil + } + return true, nil + }); err != nil { + b.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err) + } + validateJobSucceeded(ctx, b, clientSet, jobObj) + + // Cleanup Pods and Job. + b.StopTimer() + // Clean up pods in pages, because DeleteCollection might timeout. + // #90743 + for { + pods, err := clientSet.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{Limit: 1}) + if err != nil { + b.Fatalf("Failed to list Pods for cleanup: %v", err) + } + if len(pods.Items) == 0 { + break + } + err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx, + metav1.DeleteOptions{}, + metav1.ListOptions{ + Limit: 1000, + }) + if err != nil { + b.Fatalf("Failed to cleanup Pods: %v", err) + } + } + err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{}) + if err != nil { + b.Fatalf("Failed to cleanup Job: %v", err) + } + b.StartTimer() + } + }) + } +} + // TestDisableJobTrackingWithFinalizers ensures that when the // JobTrackingWithFinalizers feature is disabled, tracking finalizers are // removed from all pods, but Job continues to be tracked. @@ -1723,12 +1802,12 @@ func validateJobFailed(ctx context.Context, t *testing.T, clientSet clientset.In validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed) } -func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) { +func validateJobSucceeded(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job) { t.Helper() validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobComplete) } -func validateJobCondition(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, cond batchv1.JobConditionType) { +func validateJobCondition(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, cond batchv1.JobConditionType) { t.Helper() if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) { j, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) @@ -1780,13 +1859,14 @@ func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, job updates = append(updates, pod) } } - if len(updates) != cnt { - return fmt.Errorf("couldn't set phase on %d Job Pods", cnt), 0 + successful, err := updatePodStatuses(ctx, clientSet, updates) + if successful != cnt { + return fmt.Errorf("couldn't set phase on %d Job pods", cnt-successful), successful } - return updatePodStatuses(ctx, clientSet, updates) + return err, successful } -func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) (error, int) { +func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) (int, error) { wg := sync.WaitGroup{} wg.Add(len(updates)) errCh := make(chan error, len(updates)) @@ -1808,10 +1888,10 @@ func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updat select { case err := <-errCh: - return fmt.Errorf("updating Pod status: %w", err), int(updated) + return int(updated), fmt.Errorf("updating Pod status: %w", err) default: } - return nil, int(updated) + return int(updated), nil } func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error { @@ -1861,13 +1941,14 @@ func createJobWithDefaults(ctx context.Context, clientSet clientset.Interface, n return clientSet.BatchV1().Jobs(ns).Create(ctx, jobObj, metav1.CreateOptions{}) } -func setup(t *testing.T, nsBaseName string) (framework.TearDownFunc, *restclient.Config, clientset.Interface, *v1.Namespace) { +func setup(t testing.TB, nsBaseName string) (framework.TearDownFunc, *restclient.Config, clientset.Interface, *v1.Namespace) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) config := restclient.CopyConfig(server.ClientConfig) config.QPS = 200 config.Burst = 200 + config.Timeout = 0 clientSet, err := clientset.NewForConfig(config) if err != nil { t.Fatalf("Error creating clientset: %v", err)