From 99eea8054b1787eb4424f583c34eba9fc70e49ed Mon Sep 17 00:00:00 2001 From: Tomas Tormo Date: Mon, 29 Jul 2024 13:01:30 +0000 Subject: [PATCH] Improve Job integration tests runtime --- test/integration/job/job_test.go | 137 +++++++++++++++++++------------ 1 file changed, 85 insertions(+), 52 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 07c3f286994..def11e6f326 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -424,16 +424,17 @@ func TestJobPodFailurePolicy(t *testing.T) { }, }, } + + closeFn, restConfig, clientSet, ns := setup(t, "pod-failure-policy") + t.Cleanup(closeFn) for name, test := range testCases { t.Run(name, func(t *testing.T) { resetMetrics() - closeFn, restConfig, clientSet, ns := setup(t, "simple") - defer closeFn() ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer func() { + t.Cleanup(func() { cancel() - }() + }) jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job) if err != nil { @@ -758,18 +759,18 @@ func TestSuccessPolicy(t *testing.T) { wantConditionTypes: []batchv1.JobConditionType{batchv1.JobFailed}, }, } + + closeFn, restConfig, clientSet, ns := setup(t, "success-policy") + t.Cleanup(closeFn) for name, tc := range testCases { t.Run(name, func(t *testing.T) { resetMetrics() featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex) - closeFn, restConfig, clientSet, ns := setup(t, "simple") - defer closeFn() ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer func() { - cancel() - }() + t.Cleanup(cancel) + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &tc.job) if err != nil { t.Fatalf("Error %v while creating the Job %q", err, jobObj.Name) @@ -1177,11 +1178,12 @@ func TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff(t *testing.T) // are terminal. The fate of the Job is indicated by the interim Job conditions: // FailureTarget, or SuccessCriteriaMet. func TestDelayTerminalPhaseCondition(t *testing.T) { + const blockDeletionFinalizerForTest string = "fake.example.com/blockDeletion" t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) podTemplateSpec := v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Finalizers: []string{"fake.example.com/blockDeletion"}, + Finalizers: []string{blockDeletionFinalizerForTest}, }, Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -1512,6 +1514,9 @@ func TestDelayTerminalPhaseCondition(t *testing.T) { }, }, } + + closeFn, restConfig, clientSet, ns := setup(t, "delay-terminal-condition") + t.Cleanup(closeFn) for name, test := range testCases { t.Run(name, func(t *testing.T) { resetMetrics() @@ -1520,8 +1525,6 @@ func TestDelayTerminalPhaseCondition(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, true) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, test.enableJobSuccessPolicy) - closeFn, restConfig, clientSet, ns := setup(t, "delay-terminal-condition") - t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) t.Cleanup(cancel) @@ -1529,7 +1532,11 @@ func TestDelayTerminalPhaseCondition(t *testing.T) { if err != nil { t.Fatalf("Error %q while creating the job %q", err, jobObj.Name) } - t.Cleanup(func() { removePodsFinalizer(ctx, t, clientSet, ns.Name) }) + t.Cleanup(func() { + if err := cleanUp(ctx, clientSet, jobObj, []string{blockDeletionFinalizerForTest, batchv1.JobTrackingFinalizer}); err != nil { + t.Fatalf("Failed cleanup: %v", err) + } + }) jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace) waitForPodsToBeActive(ctx, t, jobClient, *jobObj.Spec.Parallelism, jobObj) @@ -1898,17 +1905,17 @@ func TestBackoffLimitPerIndex(t *testing.T) { }, }, } + + closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index") + t.Cleanup(closeFn) for name, test := range testCases { t.Run(name, func(t *testing.T) { resetMetrics() featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true) - closeFn, restConfig, clientSet, ns := setup(t, "simple") - defer closeFn() ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer func() { - cancel() - }() + t.Cleanup(cancel) + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job) if err != nil { t.Fatalf("Error %q while creating the job %q", err, jobObj.Name) @@ -1969,6 +1976,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { // reconcile or skip reconciliation of the Job depending on the Job's managedBy // field, and the enablement of the JobManagedBy feature gate. func TestManagedBy(t *testing.T) { + const blockDeletionFinalizerForTest string = "fake.example.com/blockDeletion" customControllerName := "example.com/custom-job-controller" podTemplateSpec := v1.PodTemplateSpec{ Spec: v1.PodSpec{ @@ -2061,20 +2069,29 @@ func TestManagedBy(t *testing.T) { }, }, } + + closeFn, restConfig, clientSet, ns := setup(t, "managed-by") + t.Cleanup(closeFn) for name, test := range testCases { t.Run(name, func(t *testing.T) { resetMetrics() featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, test.enableJobManagedBy) - closeFn, restConfig, clientSet, ns := setup(t, "managed-by") - defer closeFn() ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job) if err != nil { t.Fatalf("Error %v while creating the job %q", err, klog.KObj(jobObj)) } + t.Cleanup(func() { + // Wait for cleanup to finish to prevent this job from affecting metrics + if err := cleanUp(ctx, clientSet, jobObj, []string{blockDeletionFinalizerForTest, batchv1.JobTrackingFinalizer}); err != nil { + t.Fatalf("Failed cleanup: %v", err) + } + }) + if test.wantReconciledByBuiltInController { validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Active: int(*jobObj.Spec.Parallelism), @@ -2814,6 +2831,7 @@ func TestIndexedJob(t *testing.T) { } func TestJobPodReplacementPolicy(t *testing.T) { + const blockDeletionFinalizerForTest string = "fake.example.com/blockDeletion" t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) indexedCompletion := batchv1.IndexedCompletion nonIndexedCompletion := batchv1.NonIndexedCompletion @@ -2844,7 +2862,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { CompletionMode: &indexedCompletion, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Finalizers: []string{"fake.example.com/blockDeletion"}, + Finalizers: []string{blockDeletionFinalizerForTest}, }, }, }, @@ -2869,7 +2887,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed), Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Finalizers: []string{"fake.example.com/blockDeletion"}, + Finalizers: []string{blockDeletionFinalizerForTest}, }, }, }, @@ -2897,7 +2915,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed), Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Finalizers: []string{"fake.example.com/blockDeletion"}, + Finalizers: []string{blockDeletionFinalizerForTest}, }, }, }, @@ -2925,7 +2943,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Finalizers: []string{"fake.example.com/blockDeletion"}, + Finalizers: []string{blockDeletionFinalizerForTest}, }, }, PodFailurePolicy: &batchv1.PodFailurePolicy{ @@ -2959,7 +2977,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Finalizers: []string{"fake.example.com/blockDeletion"}, + Finalizers: []string{blockDeletionFinalizerForTest}, }, }, }, @@ -2987,7 +3005,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Finalizers: []string{"fake.example.com/blockDeletion"}, + Finalizers: []string{blockDeletionFinalizerForTest}, }, }, }, @@ -3007,13 +3025,14 @@ func TestJobPodReplacementPolicy(t *testing.T) { }, }, } + + closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy") + t.Cleanup(closeFn) for name, tc := range cases { tc := tc t.Run(name, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.podReplacementPolicyEnabled) - closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy") - t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) t.Cleanup(cancel) resetMetrics() @@ -3027,7 +3046,11 @@ func TestJobPodReplacementPolicy(t *testing.T) { jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace) waitForPodsToBeActive(ctx, t, jobClient, 2, jobObj) - t.Cleanup(func() { removePodsFinalizer(ctx, t, clientSet, ns.Name) }) + t.Cleanup(func() { + if err := cleanUp(ctx, clientSet, jobObj, []string{blockDeletionFinalizerForTest, batchv1.JobTrackingFinalizer}); err != nil { + t.Fatalf("Failed cleanup: %v", err) + } + }) deletePods(ctx, t, clientSet, ns.Name) @@ -3218,11 +3241,11 @@ func TestElasticIndexedJob(t *testing.T) { }, } + closeFn, restConfig, clientSet, ns := setup(t, "indexed") + t.Cleanup(closeFn) for name, tc := range cases { tc := tc t.Run(name, func(t *testing.T) { - closeFn, restConfig, clientSet, ns := setup(t, "indexed") - defer closeFn() ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() resetMetrics() @@ -3357,7 +3380,7 @@ func BenchmarkLargeIndexedJob(b *testing.B) { b.Fatalf("Failed to create Job: %v", err) } b.Cleanup(func() { - if err := cleanUp(ctx, clientSet, jobObj); err != nil { + if err := cleanUp(ctx, clientSet, jobObj, []string{batchv1.JobTrackingFinalizer}); err != nil { b.Fatalf("Failed cleanup: %v", err) } }) @@ -3443,7 +3466,7 @@ func BenchmarkLargeFailureHandling(b *testing.B) { b.Fatalf("Failed to create Job: %v", err) } b.Cleanup(func() { - if err := cleanUp(ctx, clientSet, jobObj); err != nil { + if err := cleanUp(ctx, clientSet, jobObj, []string{batchv1.JobTrackingFinalizer}); err != nil { b.Fatalf("Failed cleanup: %v", err) } }) @@ -3477,8 +3500,11 @@ func BenchmarkLargeFailureHandling(b *testing.B) { } } -// cleanUp deletes all pods and the job -func cleanUp(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job) error { +// cleanUp removes the specified pod finalizers, then deletes all pods and the job. +func cleanUp(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, podFinalizersToRemove []string) error { + if err := removePodsFinalizers(ctx, clientSet, jobObj.Namespace, podFinalizersToRemove); err != nil { + return err + } // Clean up pods in pages, because DeleteCollection might timeout. // #90743 for { @@ -3498,14 +3524,18 @@ func cleanUp(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1 return err } } - return clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{}) + + // Set the propagation policy to background to ensure that the job doesn't receive any finalizer at deletion time + return clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{ + PropagationPolicy: ptr.To(metav1.DeletePropagationBackground), + }) } func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { + closeFn, restConfig, clientSet, ns := setup(t, "orphan-pod-finalizers") + t.Cleanup(closeFn) for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground, metav1.DeletePropagationForeground} { t.Run(string(policy), func(t *testing.T) { - closeFn, restConfig, clientSet, ns := setup(t, "simple") - defer closeFn() informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "controller-informers")), 0) // Make the job controller significantly slower to trigger race condition. restConfig.QPS = 1 @@ -3814,11 +3844,11 @@ func TestSuspendJob(t *testing.T) { }, } + closeFn, restConfig, clientSet, ns := setup(t, "suspend") + t.Cleanup(closeFn) for _, tc := range testCases { name := fmt.Sprintf("feature=%v,create=%v,update=%v", tc.featureGate, tc.create.flag, tc.update.flag) t.Run(name, func(t *testing.T) { - closeFn, restConfig, clientSet, ns := setup(t, "suspend") - defer closeFn() ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{}) @@ -4333,7 +4363,7 @@ func getCompletionIndex(p *v1.Pod) (int, error) { func createJobWithDefaults(ctx context.Context, clientSet clientset.Interface, ns string, jobObj *batchv1.Job) (*batchv1.Job, error) { if jobObj.Name == "" { - jobObj.Name = "test-job" + jobObj.GenerateName = "test-job" } if len(jobObj.Spec.Template.Spec.Containers) == 0 { jobObj.Spec.Template.Spec.Containers = []v1.Container{ @@ -4468,23 +4498,25 @@ func deletePods(ctx context.Context, t *testing.T, clientSet clientset.Interface } } -func removePodsFinalizer(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) { - t.Helper() +func removePodsFinalizers(ctx context.Context, clientSet clientset.Interface, namespace string, finalizersNames []string) error { pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) if err != nil { - t.Fatalf("Failed to list pods: %v", err) + return err } - updatePod(ctx, t, clientSet, pods.Items, func(pod *v1.Pod) { - for i, finalizer := range pod.Finalizers { - if finalizer == "fake.example.com/blockDeletion" { - pod.Finalizers = append(pod.Finalizers[:i], pod.Finalizers[i+1:]...) + + finalizersSet := sets.New(finalizersNames...) + return updatePod(ctx, clientSet, pods.Items, func(pod *v1.Pod) { + podFinalizers := []string{} + for _, podFinalizer := range pod.Finalizers { + if _, found := finalizersSet[podFinalizer]; !found { + podFinalizers = append(podFinalizers, podFinalizer) } } + pod.Finalizers = podFinalizers }) } -func updatePod(ctx context.Context, t *testing.T, clientSet clientset.Interface, pods []v1.Pod, updateFunc func(*v1.Pod)) { - t.Helper() +func updatePod(ctx context.Context, clientSet clientset.Interface, pods []v1.Pod, updateFunc func(*v1.Pod)) error { for _, val := range pods { if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { newPod, err := clientSet.CoreV1().Pods(val.Namespace).Get(ctx, val.Name, metav1.GetOptions{}) @@ -4495,9 +4527,10 @@ func updatePod(ctx context.Context, t *testing.T, clientSet clientset.Interface, _, err = clientSet.CoreV1().Pods(val.Namespace).Update(ctx, newPod, metav1.UpdateOptions{}) return err }); err != nil { - t.Fatalf("Failed to update pod %s: %v", val.Name, err) + return err } } + return nil } func failTerminatingPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {