diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 4a3f8c71246..feab363f7b5 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -1023,7 +1023,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job if podFinished || podTerminating || job.DeletionTimestamp != nil { podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod) } - if pod.Status.Phase == v1.PodSucceeded { + if pod.Status.Phase == v1.PodSucceeded && !uncounted.failed.Has(string(pod.UID)) { if isIndexed { // The completion index is enough to avoid recounting succeeded pods. // No need to track UIDs. diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 79227a3b06d..a2c0954debc 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -1613,6 +1613,32 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, }, }, + "pod flips from failed to succeeded": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + Parallelism: pointer.Int32(2), + }, + Status: batch.JobStatus{ + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Failed: []types.UID{"a", "b"}, + }, + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().Pod, + buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod, + }, + finishedCond: failedCond, + wantRmFinalizers: 2, + wantStatusUpdates: []batch.JobStatus{ + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Failed: 2, + Conditions: []batch.JobCondition{*failedCond}, + }, + }, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 143fab92513..2428cf75ec9 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -90,7 +91,7 @@ func TestNonParallelJob(t *testing.T) { ctx, cancel = startJobControllerAndWaitForCaches(restConfig) // Failed Pod is replaced. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ @@ -104,7 +105,7 @@ func TestNonParallelJob(t *testing.T) { ctx, cancel = startJobControllerAndWaitForCaches(restConfig) // No more Pods are created after the Pod succeeds. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } validateJobSucceeded(ctx, t, clientSet, jobObj) @@ -160,7 +161,7 @@ func TestParallelJob(t *testing.T) { validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) // Tracks ready pods, if enabled. - if err := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil { + if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil { t.Fatalf("Failed Marking Pods as ready: %v", err) } if tc.enableReadyPods { @@ -169,7 +170,7 @@ func TestParallelJob(t *testing.T) { validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) // Failed Pods are replaced. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } want = podsByStatus{ @@ -181,7 +182,7 @@ func TestParallelJob(t *testing.T) { } validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) // Once one Pod succeeds, no more Pods are created, even if some fail. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } want = podsByStatus{ @@ -193,7 +194,7 @@ func TestParallelJob(t *testing.T) { want.Ready = pointer.Int32(0) } validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } want = podsByStatus{ @@ -206,7 +207,7 @@ func TestParallelJob(t *testing.T) { } validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) // No more Pods are created after remaining Pods succeed. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil { + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err) } validateJobSucceeded(ctx, t, clientSet, jobObj) @@ -270,7 +271,7 @@ func TestParallelJobParallelism(t *testing.T) { }, wFinalizers) // Succeed Job - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil { + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } validateJobSucceeded(ctx, t, clientSet, jobObj) @@ -332,7 +333,7 @@ func TestParallelJobWithCompletions(t *testing.T) { validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) // Tracks ready pods, if enabled. - if err := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil { + if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil { t.Fatalf("Failed Marking Pods as ready: %v", err) } if tc.enableReadyPods { @@ -341,7 +342,7 @@ func TestParallelJobWithCompletions(t *testing.T) { validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) // Failed Pods are replaced. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } want = podsByStatus{ @@ -353,7 +354,7 @@ func TestParallelJobWithCompletions(t *testing.T) { } validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) // Pods are created until the number of succeeded Pods equals completions. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil { + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } want = podsByStatus{ @@ -366,7 +367,7 @@ func TestParallelJobWithCompletions(t *testing.T) { } validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) // No more Pods are created after the Job completes. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err) } validateJobSucceeded(ctx, t, clientSet, jobObj) @@ -439,7 +440,7 @@ func TestIndexedJob(t *testing.T) { validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1") // Remaining Pods succeed. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { t.Fatal("Failed trying to succeed remaining pods") } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ @@ -491,7 +492,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) { cancel() // Fail a pod while Job controller is stopped. - if err := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodFailed, 1); err != nil { + if err, _ := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodFailed, 1); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err) } @@ -518,7 +519,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) { cancel() // Succeed a pod while Job controller is stopped. - if err := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodSucceeded, 1); err != nil { + if err, _ := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodSucceeded, 1); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err) } @@ -607,7 +608,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) { // Fail a pod ASAP. err = wait.PollImmediate(time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { return false, nil } return true, nil @@ -621,6 +622,57 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) { validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj) } +// TestJobFailedWithInterrupts tests that a job were one pod fails and the rest +// succeed is marked as Failed, even if the controller fails in the middle. +func TestJobFailedWithInterrupts(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)() + + closeFn, restConfig, clientSet, ns := setup(t, "simple") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + defer func() { + cancel() + }() + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Completions: pointer.Int32(10), + Parallelism: pointer.Int32(10), + BackoffLimit: pointer.Int32(0), + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + NodeName: "foo", // Scheduled pods are not deleted immediately. + }, + }, + }, + }) + if err != nil { + t.Fatalf("Could not create job: %v", err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 10, + Ready: pointer.Int32(0), + }, true) + t.Log("Finishing pods") + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + t.Fatalf("Could not fail a pod: %v", err) + } + remaining := 9 + if err := wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil { + remaining -= succ + t.Logf("Transient failure succeeding pods: %v", err) + return false, nil + } + return true, nil + }); err != nil { + t.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err) + } + t.Log("Recreating job controller") + cancel() + ctx, cancel = startJobControllerAndWaitForCaches(restConfig) + validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed) +} + func validateNoOrphanPodsWithFinalizers(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) { t.Helper() orphanPods := 0 @@ -1041,7 +1093,7 @@ func validateJobCondition(ctx context.Context, t *testing.T, clientSet clientset } } -func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) error { +func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) (error, int) { op := func(p *v1.Pod) bool { p.Status.Phase = phase return true @@ -1049,7 +1101,7 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt) } -func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) error { +func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) (error, int) { op := func(p *v1.Pod) bool { if podutil.IsPodReady(p) { return false @@ -1063,10 +1115,10 @@ func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt) } -func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) error { +func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) (error, int) { pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { - return fmt.Errorf("listing Job Pods: %w", err) + return fmt.Errorf("listing Job Pods: %w", err), 0 } updates := make([]v1.Pod, 0, cnt) for _, pod := range pods.Items { @@ -1081,15 +1133,16 @@ func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, job } } if len(updates) != cnt { - return fmt.Errorf("couldn't set phase on %d Job Pods", cnt) + return fmt.Errorf("couldn't set phase on %d Job Pods", cnt), 0 } return updatePodStatuses(ctx, clientSet, updates) } -func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) error { +func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) (error, int) { wg := sync.WaitGroup{} wg.Add(len(updates)) errCh := make(chan error, len(updates)) + var updated int32 for _, pod := range updates { pod := pod @@ -1097,6 +1150,8 @@ func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updat _, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{}) if err != nil { errCh <- err + } else { + atomic.AddInt32(&updated, 1) } wg.Done() }() @@ -1105,10 +1160,10 @@ func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updat select { case err := <-errCh: - return fmt.Errorf("updating Pod status: %w", err) + return fmt.Errorf("updating Pod status: %w", err), int(updated) default: } - return nil + return nil, int(updated) } func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error {