diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index cd911e137b9..3adb3bdf4c9 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -914,10 +914,10 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { } needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(ready, job.Status.Ready) + needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, jobCtx.terminating) job.Status.Active = active job.Status.Ready = ready job.Status.Terminating = jobCtx.terminating - needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, jobCtx.terminating) err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate) if err != nil { return fmt.Errorf("tracking status: %w", err) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index a6ad13fff56..cce3cec7905 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1690,81 +1690,102 @@ func TestIndexedJob(t *testing.T) { } func TestJobPodReplacementPolicy(t *testing.T) { - const podCount int32 = 2 indexedCompletion := batchv1.IndexedCompletion nonIndexedCompletion := batchv1.NonIndexedCompletion var podReplacementPolicy = func(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy { return &obj } - jobSpecIndexedDefault := &batchv1.JobSpec{ - Parallelism: ptr.To(podCount), - Completions: ptr.To(podCount), - CompletionMode: &indexedCompletion, + type jobStatus struct { + active int + failed int + terminating *int32 } cases := map[string]struct { podReplacementPolicyEnabled bool - deletePods bool - failPods bool - wantTerminating *int32 - wantFailed int - wantActive int jobSpec *batchv1.JobSpec + wantStatusAfterDeletion jobStatus + wantStatusAfterFailure jobStatus }{ - "feature flag off, delete pods and verify no terminating status": { - deletePods: true, - jobSpec: jobSpecIndexedDefault, - wantActive: int(podCount), - wantFailed: int(podCount), - }, - "feature flag true, delete pods and verify terminating status": { - podReplacementPolicyEnabled: true, - deletePods: true, - jobSpec: jobSpecIndexedDefault, - wantTerminating: ptr.To(podCount), - wantFailed: int(podCount), - }, - "feature flag true, delete pods, verify terminating status and recreate upon terminating": { - podReplacementPolicyEnabled: true, - deletePods: true, + "feature flag off, delete & fail pods, recreate terminating pods, and verify job status counters": { jobSpec: &batchv1.JobSpec{ - Parallelism: ptr.To(podCount), - Completions: ptr.To(podCount), + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + CompletionMode: &indexedCompletion, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{"fake.example.com/blockDeletion"}, + }, + }, + }, + wantStatusAfterDeletion: jobStatus{ + active: 2, + failed: 2, + }, + wantStatusAfterFailure: jobStatus{ + active: 2, + failed: 2, + }, + }, + "feature flag true, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": { + podReplacementPolicyEnabled: true, + jobSpec: &batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), CompletionMode: &indexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed), + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{"fake.example.com/blockDeletion"}, + }, + }, + }, + wantStatusAfterDeletion: jobStatus{ + active: 2, + failed: 2, + terminating: ptr.To[int32](2), + }, + wantStatusAfterFailure: jobStatus{ + active: 2, + failed: 2, + terminating: ptr.To[int32](0), }, - wantTerminating: ptr.To(podCount), - wantFailed: int(podCount), }, - "feature flag true, delete pods, verify terminating status and recreate once failed": { + "feature flag true with NonIndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": { podReplacementPolicyEnabled: true, - deletePods: true, jobSpec: &batchv1.JobSpec{ - Parallelism: ptr.To(podCount), - Completions: ptr.To(podCount), - CompletionMode: &nonIndexedCompletion, - PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + CompletionMode: &indexedCompletion, + PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed), + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{"fake.example.com/blockDeletion"}, + }, + }, }, - wantTerminating: ptr.To(podCount), - }, - "feature flag true with NonIndexedJob, delete pods, verify terminating status and recreate once failed": { - podReplacementPolicyEnabled: true, - deletePods: true, - jobSpec: &batchv1.JobSpec{ - Parallelism: ptr.To(podCount), - Completions: ptr.To(podCount), - CompletionMode: &nonIndexedCompletion, - PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), + wantStatusAfterDeletion: jobStatus{ + active: 2, + failed: 2, + terminating: ptr.To[int32](2), + }, + wantStatusAfterFailure: jobStatus{ + active: 2, + failed: 2, + terminating: ptr.To[int32](0), }, - wantTerminating: ptr.To(podCount), }, - "feature flag false, podFailurePolicy enabled, delete pods, verify terminating status and recreate once failed": { + "feature flag false, podFailurePolicy enabled, delete & fail pods, recreate failed pods, and verify job status counters": { podReplacementPolicyEnabled: false, - deletePods: true, jobSpec: &batchv1.JobSpec{ - Parallelism: ptr.To(podCount), - Completions: ptr.To(podCount), + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), CompletionMode: &nonIndexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{"fake.example.com/blockDeletion"}, + }, + }, PodFailurePolicy: &batchv1.PodFailurePolicy{ Rules: []batchv1.PodFailurePolicyRule{ { @@ -1777,33 +1798,60 @@ func TestJobPodReplacementPolicy(t *testing.T) { }, }, }, - wantActive: int(podCount), + wantStatusAfterDeletion: jobStatus{ + active: 2, + }, + wantStatusAfterFailure: jobStatus{ + active: 2, + }, }, - "feature flag true, recreate failed pods, and verify active and failed counters": { + "feature flag true, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": { podReplacementPolicyEnabled: true, - failPods: true, jobSpec: &batchv1.JobSpec{ - Parallelism: ptr.To(podCount), - Completions: ptr.To(podCount), + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), CompletionMode: &indexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{"fake.example.com/blockDeletion"}, + }, + }, + }, + wantStatusAfterDeletion: jobStatus{ + active: 0, + failed: 0, + terminating: ptr.To[int32](2), + }, + wantStatusAfterFailure: jobStatus{ + active: 2, + failed: 2, + terminating: ptr.To[int32](0), }, - wantActive: int(podCount), - wantFailed: int(podCount), - wantTerminating: ptr.To[int32](0), }, - "feature flag true with NonIndexedJob, recreate failed pods, and verify active and failed counters": { + "feature flag true with NonIndexedJob, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": { podReplacementPolicyEnabled: true, - failPods: true, jobSpec: &batchv1.JobSpec{ - Parallelism: ptr.To(podCount), - Completions: ptr.To(podCount), + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), CompletionMode: &nonIndexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{"fake.example.com/blockDeletion"}, + }, + }, + }, + wantStatusAfterDeletion: jobStatus{ + active: 0, + failed: 0, + terminating: ptr.To[int32](2), + }, + wantStatusAfterFailure: jobStatus{ + active: 2, + failed: 2, + terminating: ptr.To[int32](0), }, - wantActive: int(podCount), - wantFailed: int(podCount), - wantTerminating: ptr.To[int32](0), }, } for name, tc := range cases { @@ -1813,9 +1861,9 @@ func TestJobPodReplacementPolicy(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.jobSpec.PodFailurePolicy != nil)() closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) resetMetrics() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -1826,41 +1874,24 @@ func TestJobPodReplacementPolicy(t *testing.T) { } jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace) - // Wait for pods to start up. - err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) { - job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - if job.Status.Active == podCount { - return true, nil - } - return false, nil - }) - if err != nil { - t.Fatalf("Error waiting for Job pods to become active: %v", err) - } - if tc.deletePods { - err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx, - metav1.DeleteOptions{}, - metav1.ListOptions{ - Limit: 1000, - }) - if err != nil { - t.Fatalf("Failed to delete Pods: %v", err) - } - } - if tc.failPods { - err, _ = setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, int(podCount)) - if err != nil { - t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) - } - } + waitForPodsToBeActive(ctx, t, jobClient, 2, jobObj) + t.Cleanup(func() { removePodsFinalizer(ctx, t, clientSet, ns.Name) }) + + deletePods(ctx, t, clientSet, ns.Name) validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{ - Terminating: tc.wantTerminating, - Failed: tc.wantFailed, - Active: tc.wantActive, + Terminating: tc.wantStatusAfterDeletion.terminating, + Failed: tc.wantStatusAfterDeletion.failed, + Active: tc.wantStatusAfterDeletion.active, + Ready: ptr.To[int32](0), + }) + + failTerminatingPods(ctx, t, clientSet, ns.Name) + + validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{ + Terminating: tc.wantStatusAfterFailure.terminating, + Failed: tc.wantStatusAfterFailure.failed, + Active: tc.wantStatusAfterFailure.active, Ready: ptr.To[int32](0), }) }) @@ -3022,3 +3053,80 @@ func updateJob(ctx context.Context, jobClient typedv1.JobInterface, jobName stri }) return job, err } + +func waitForPodsToBeActive(ctx context.Context, t *testing.T, jobClient typedv1.JobInterface, podCount int32, jobObj *batchv1.Job) { + t.Helper() + err := wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(context.Context) (done bool, err error) { + job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return job.Status.Active == podCount, nil + }) + if err != nil { + t.Fatalf("Error waiting for Job pods to become active: %v", err) + } +} + +func deletePods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) { + t.Helper() + err := clientSet.CoreV1().Pods(namespace).DeleteCollection(ctx, + metav1.DeleteOptions{}, + metav1.ListOptions{ + Limit: 1000, + }) + if err != nil { + t.Fatalf("Failed to cleanup Pods: %v", err) + } +} + +func removePodsFinalizer(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) { + t.Helper() + pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list pods: %v", err) + } + updatePod(ctx, t, clientSet, pods.Items, func(pod *v1.Pod) { + for i, finalizer := range pod.Finalizers { + if finalizer == "fake.example.com/blockDeletion" { + pod.Finalizers = append(pod.Finalizers[:i], pod.Finalizers[i+1:]...) + } + } + }) +} + +func updatePod(ctx context.Context, t *testing.T, clientSet clientset.Interface, pods []v1.Pod, updateFunc func(*v1.Pod)) { + t.Helper() + for _, val := range pods { + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + newPod, err := clientSet.CoreV1().Pods(val.Namespace).Get(ctx, val.Name, metav1.GetOptions{}) + if err != nil { + return err + } + updateFunc(newPod) + _, err = clientSet.CoreV1().Pods(val.Namespace).Update(ctx, newPod, metav1.UpdateOptions{}) + return err + }); err != nil { + t.Fatalf("Failed to update pod %s: %v", val.Name, err) + } + } +} + +func failTerminatingPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) { + t.Helper() + pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list pods: %v", err) + } + var terminatingPods []v1.Pod + for _, pod := range pods.Items { + if pod.DeletionTimestamp != nil { + pod.Status.Phase = v1.PodFailed + terminatingPods = append(terminatingPods, pod) + } + } + _, err = updatePodStatuses(ctx, clientSet, terminatingPods) + if err != nil { + t.Fatalf("Failed to update pod statuses: %v", err) + } +}