diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 01be1a951f7..158c5022cc5 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -384,7 +384,8 @@ func (jm *Controller) deletePod(obj interface{}, final bool) { return } job := jm.resolveControllerRef(pod.Namespace, controllerRef) - if job == nil { + if job == nil || IsJobFinished(job) { + // syncJob will not remove this finalizer. if hasFinalizer { jm.enqueueOrphanPod(pod) } @@ -585,7 +586,7 @@ func (jm Controller) syncOrphanPod(ctx context.Context, key string) error { // Make sure the pod is still orphaned. if controllerRef := metav1.GetControllerOf(sharedPod); controllerRef != nil { job := jm.resolveControllerRef(sharedPod.Namespace, controllerRef) - if job != nil { + if job != nil && !IsJobFinished(job) { // The pod was adopted. Do not remove finalizer. return nil } @@ -725,7 +726,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the job sync is just deferred till the next relist. - jobNeedsSync := jm.expectations.SatisfiedExpectations(key) + satisfiedExpectations := jm.expectations.SatisfiedExpectations(key) pods, err := jm.getPodsForJob(ctx, &job, uncounted != nil) if err != nil { @@ -782,9 +783,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr if uncounted == nil { // Legacy behavior: pretend all active pods were successfully removed. deleted = active - } else if deleted != active { + } else if deleted != active || !satisfiedExpectations { // Can't declare the Job as finished yet, as there might be remaining - // pod finalizers. + // pod finalizers or pods that are not in the informer's cache yet. finishedCondition = nil } active -= deleted @@ -792,7 +793,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr manageJobErr = err } else { manageJobCalled := false - if jobNeedsSync && job.DeletionTimestamp == nil { + if satisfiedExpectations && job.DeletionTimestamp == nil { active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes) manageJobCalled = true } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 9f406c9011b..4e054335297 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -196,6 +196,9 @@ func TestControllerSyncJob(t *testing.T) { wasSuspended bool suspend bool + // If set, it means that the case is exclusive to tracking with/without finalizers. + wFinalizersExclusive *bool + // pod setup podControllerError error jobKeyForget bool @@ -480,6 +483,16 @@ func TestControllerSyncJob(t *testing.T) { expectedConditionReason: "BackoffLimitExceeded", expectedPodPatches: 1, }, + "job failures, unsatisfied expectations": { + wFinalizersExclusive: pointer.Bool(true), + parallelism: 2, + completions: 5, + deleting: true, + failedPods: 1, + fakeExpectationAtCreation: 1, + expectedFailed: 1, + expectedPodPatches: 1, + }, "indexed job start": { parallelism: 2, completions: 5, @@ -706,6 +719,9 @@ func TestControllerSyncJob(t *testing.T) { if wFinalizers && tc.podControllerError != nil { t.Skip("Can't track status if finalizers can't be removed") } + if tc.wFinalizersExclusive != nil && *tc.wFinalizersExclusive != wFinalizers { + t.Skipf("Test is exclusive for wFinalizers=%t", *tc.wFinalizersExclusive) + } defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() @@ -2580,32 +2596,63 @@ func TestWatchOrphanPods(t *testing.T) { jobSynced = true return true, nil } - - // Create job but don't add it to the store. - testJob := newJob(2, 2, 6, batch.NonIndexedCompletion) - stopCh := make(chan struct{}) defer close(stopCh) go sharedInformers.Core().V1().Pods().Informer().Run(stopCh) go manager.Run(context.TODO(), 1) - orphanPod := buildPod().name("a").job(testJob).deletionTimestamp().trackingFinalizer().Pod - orphanPod, err := clientset.CoreV1().Pods("default").Create(context.Background(), orphanPod, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("Creating orphan pod: %v", err) + // Create job but don't add it to the store. + cases := map[string]struct { + job *batch.Job + inCache bool + wantJobSynced bool + }{ + "job_does_not_exist": { + job: newJob(2, 2, 6, batch.NonIndexedCompletion), + }, + "orphan": {}, + "job_finished": { + job: func() *batch.Job { + j := newJob(2, 2, 6, batch.NonIndexedCompletion) + j.Status.Conditions = append(j.Status.Conditions, batch.JobCondition{ + Type: batch.JobComplete, + Status: v1.ConditionTrue, + }) + return j + }(), + inCache: true, + }, } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + jobSynced = false + if tc.inCache { + sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job) + } - if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { - p, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(context.Background(), orphanPod.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - return !hasJobTrackingFinalizer(p), nil - }); err != nil { - t.Errorf("Waiting for Pod to get the finalizer removed: %v", err) - } - if jobSynced { - t.Error("Tried to sync deleted job") + podBuilder := buildPod().name(name).deletionTimestamp().trackingFinalizer() + if tc.job != nil { + podBuilder = podBuilder.job(tc.job) + } + orphanPod := podBuilder.Pod + orphanPod, err := clientset.CoreV1().Pods("default").Create(context.Background(), orphanPod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Creating orphan pod: %v", err) + } + + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + p, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(context.Background(), orphanPod.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return !hasJobTrackingFinalizer(p), nil + }); err != nil { + t.Errorf("Waiting for Pod to get the finalizer removed: %v", err) + } + if !tc.inCache && jobSynced { + t.Error("Tried to sync deleted job") + } + }) } } diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 7c5fef73be4..5aa2d6cb758 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -535,7 +535,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) { func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)() - for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground} { + for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground, metav1.DeletePropagationForeground} { t.Run(string(policy), func(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() @@ -575,29 +575,76 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { if err != nil { t.Fatalf("Failed to delete job: %v", err) } - orphanPods := 0 - if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) { - pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{ - LabelSelector: metav1.FormatLabelSelector(jobObj.Spec.Selector), - }) - if err != nil { - return false, err - } - orphanPods = 0 - for _, pod := range pods.Items { - if hasJobTrackingFinalizer(&pod) { - orphanPods++ - } - } - return orphanPods == 0, nil - }); err != nil { - t.Errorf("Failed waiting for pods to be freed from finalizer: %v", err) - t.Logf("Last saw %d orphan pods", orphanPods) - } + validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj) }) } } +func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)() + + closeFn, restConfig, clientSet, ns := setup(t, "simple") + defer closeFn() + ctx, cancel := startJobController(restConfig) + defer func() { + cancel() + }() + + // Job tracking with finalizers requires less calls in Indexed mode, + // so it's more likely to process all finalizers before all the pods + // are visible. + mode := batchv1.IndexedCompletion + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + CompletionMode: &mode, + Completions: pointer.Int32(500), + Parallelism: pointer.Int32(500), + BackoffLimit: pointer.Int32(0), + }, + }) + if err != nil { + t.Fatalf("Could not create job: %v", err) + } + + // Fail a pod ASAP. + err = wait.PollImmediate(time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + return false, nil + } + return true, nil + }) + if err != nil { + t.Fatalf("Could not fail pod: %v", err) + } + + validateJobFailed(ctx, t, clientSet, jobObj) + + validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj) +} + +func validateNoOrphanPodsWithFinalizers(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) { + t.Helper() + orphanPods := 0 + if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) { + pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(jobObj.Spec.Selector), + }) + if err != nil { + return false, err + } + orphanPods = 0 + for _, pod := range pods.Items { + if hasJobTrackingFinalizer(&pod) { + orphanPods++ + } + } + return orphanPods == 0, nil + }); err != nil { + t.Errorf("Failed waiting for pods to be freed from finalizer: %v", err) + t.Logf("Last saw %d orphan pods", orphanPods) + } +} + func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) { // Step 0: job created while feature is enabled. defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)() @@ -974,16 +1021,26 @@ func getJobConditionStatus(ctx context.Context, job *batchv1.Job, cType batchv1. return "" } +func validateJobFailed(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) { + t.Helper() + validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed) +} + func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) { + t.Helper() + validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobComplete) +} + +func validateJobCondition(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, cond batchv1.JobConditionType) { t.Helper() if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (bool, error) { j, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Failed to obtain updated Job: %v", err) } - return getJobConditionStatus(ctx, j, batchv1.JobComplete) == v1.ConditionTrue, nil + return getJobConditionStatus(ctx, j, cond) == v1.ConditionTrue, nil }); err != nil { - t.Errorf("Waiting for Job to succeed: %v", err) + t.Errorf("Waiting for Job to have condition %s: %v", cond, err) } }