From f2c8030845dc1888963a25a7ecc33d6fe4e74788 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Thu, 14 Apr 2022 11:14:14 -0400 Subject: [PATCH 1/4] Integration test for backoff limit and finalizers Change-Id: Ic231ce9a5504d3aae4191901d7eb5fe69bf017ac --- test/integration/job/job_test.go | 99 +++++++++++++++++++++++++------- 1 file changed, 78 insertions(+), 21 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 43b32c048bf..593f37e91f1 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -575,29 +575,76 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { if err != nil { t.Fatalf("Failed to delete job: %v", err) } - orphanPods := 0 - if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) { - pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{ - LabelSelector: metav1.FormatLabelSelector(jobObj.Spec.Selector), - }) - if err != nil { - return false, err - } - orphanPods = 0 - for _, pod := range pods.Items { - if hasJobTrackingFinalizer(&pod) { - orphanPods++ - } - } - return orphanPods == 0, nil - }); err != nil { - t.Errorf("Failed waiting for pods to be freed from finalizer: %v", err) - t.Logf("Last saw %d orphan pods", orphanPods) - } + validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj) }) } } +func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)() + + closeFn, restConfig, clientSet, ns := setup(t, "simple") + defer closeFn() + ctx, cancel := startJobController(restConfig) + defer func() { + cancel() + }() + + // Job tracking with finalizers requires less calls in Indexed mode, + // so it's more likely to process all finalizers before all the pods + // are visible. + mode := batchv1.IndexedCompletion + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + CompletionMode: &mode, + Completions: pointer.Int32(500), + Parallelism: pointer.Int32(500), + BackoffLimit: pointer.Int32(0), + }, + }) + if err != nil { + t.Fatalf("Could not create job: %v", err) + } + + // Fail a pod ASAP. + err = wait.PollImmediate(time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + return false, nil + } + return true, nil + }) + if err != nil { + t.Fatalf("Could not fail pod: %v", err) + } + + validateJobFailed(ctx, t, clientSet, jobObj) + + validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj) +} + +func validateNoOrphanPodsWithFinalizers(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) { + t.Helper() + orphanPods := 0 + if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) { + pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(jobObj.Spec.Selector), + }) + if err != nil { + return false, err + } + orphanPods = 0 + for _, pod := range pods.Items { + if hasJobTrackingFinalizer(&pod) { + orphanPods++ + } + } + return orphanPods == 0, nil + }); err != nil { + t.Errorf("Failed waiting for pods to be freed from finalizer: %v", err) + t.Logf("Last saw %d orphan pods", orphanPods) + } +} + func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) { // Step 0: job created while feature is enabled. defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)() @@ -974,16 +1021,26 @@ func getJobConditionStatus(ctx context.Context, job *batchv1.Job, cType batchv1. return "" } +func validateJobFailed(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) { + t.Helper() + validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed) +} + func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) { + t.Helper() + validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobComplete) +} + +func validateJobCondition(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, cond batchv1.JobConditionType) { t.Helper() if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (bool, error) { j, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Failed to obtain updated Job: %v", err) } - return getJobConditionStatus(ctx, j, batchv1.JobComplete) == v1.ConditionTrue, nil + return getJobConditionStatus(ctx, j, cond) == v1.ConditionTrue, nil }); err != nil { - t.Errorf("Waiting for Job to succeed: %v", err) + t.Errorf("Waiting for Job to have condition %s: %v", cond, err) } } From 53aa05df3ae7b2db0894699d5b8103fcff44e81c Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Thu, 14 Apr 2022 15:07:09 -0400 Subject: [PATCH 2/4] Don't mark job as failed until expectations are satisfied Change-Id: I99206f35f6f145054c005ab362c792e71b9b15f4 --- pkg/controller/job/job_controller.go | 8 ++++---- pkg/controller/job/job_controller_test.go | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 01be1a951f7..e75162f446c 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -725,7 +725,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the job sync is just deferred till the next relist. - jobNeedsSync := jm.expectations.SatisfiedExpectations(key) + satisfiedExpectations := jm.expectations.SatisfiedExpectations(key) pods, err := jm.getPodsForJob(ctx, &job, uncounted != nil) if err != nil { @@ -782,9 +782,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr if uncounted == nil { // Legacy behavior: pretend all active pods were successfully removed. deleted = active - } else if deleted != active { + } else if deleted != active || !satisfiedExpectations { // Can't declare the Job as finished yet, as there might be remaining - // pod finalizers. + // pod finalizers or pods that are not in the informer's cache yet. finishedCondition = nil } active -= deleted @@ -792,7 +792,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr manageJobErr = err } else { manageJobCalled := false - if jobNeedsSync && job.DeletionTimestamp == nil { + if satisfiedExpectations && job.DeletionTimestamp == nil { active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes) manageJobCalled = true } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 9f406c9011b..93e4ac78617 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -196,6 +196,9 @@ func TestControllerSyncJob(t *testing.T) { wasSuspended bool suspend bool + // If set, it means that the case is exclusive to tracking with/without finalizers. + wFinalizersExclusive *bool + // pod setup podControllerError error jobKeyForget bool @@ -480,6 +483,16 @@ func TestControllerSyncJob(t *testing.T) { expectedConditionReason: "BackoffLimitExceeded", expectedPodPatches: 1, }, + "job failures, unsatisfied expectations": { + wFinalizersExclusive: pointer.Bool(true), + parallelism: 2, + completions: 5, + deleting: true, + failedPods: 1, + fakeExpectationAtCreation: 1, + expectedFailed: 1, + expectedPodPatches: 1, + }, "indexed job start": { parallelism: 2, completions: 5, @@ -706,6 +719,9 @@ func TestControllerSyncJob(t *testing.T) { if wFinalizers && tc.podControllerError != nil { t.Skip("Can't track status if finalizers can't be removed") } + if tc.wFinalizersExclusive != nil && *tc.wFinalizersExclusive != wFinalizers { + t.Skipf("Test is exclusive for wFinalizers=%t", *tc.wFinalizersExclusive) + } defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() From 09caa367182320a1d8d96588ea05d38b609c2f06 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Thu, 14 Apr 2022 18:35:19 -0400 Subject: [PATCH 3/4] Fix removing finalizer from finished jobs In some rare race conditions, the job controller might create new pods after the job is declared finished. Change-Id: I8a00429c8845463259cd7f82bb3c241d0011583c --- pkg/controller/job/job_controller.go | 5 +- pkg/controller/job/job_controller_test.go | 69 ++++++++++++++++------- 2 files changed, 53 insertions(+), 21 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index e75162f446c..158c5022cc5 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -384,7 +384,8 @@ func (jm *Controller) deletePod(obj interface{}, final bool) { return } job := jm.resolveControllerRef(pod.Namespace, controllerRef) - if job == nil { + if job == nil || IsJobFinished(job) { + // syncJob will not remove this finalizer. if hasFinalizer { jm.enqueueOrphanPod(pod) } @@ -585,7 +586,7 @@ func (jm Controller) syncOrphanPod(ctx context.Context, key string) error { // Make sure the pod is still orphaned. if controllerRef := metav1.GetControllerOf(sharedPod); controllerRef != nil { job := jm.resolveControllerRef(sharedPod.Namespace, controllerRef) - if job != nil { + if job != nil && !IsJobFinished(job) { // The pod was adopted. Do not remove finalizer. return nil } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 93e4ac78617..4e054335297 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -2596,32 +2596,63 @@ func TestWatchOrphanPods(t *testing.T) { jobSynced = true return true, nil } - - // Create job but don't add it to the store. - testJob := newJob(2, 2, 6, batch.NonIndexedCompletion) - stopCh := make(chan struct{}) defer close(stopCh) go sharedInformers.Core().V1().Pods().Informer().Run(stopCh) go manager.Run(context.TODO(), 1) - orphanPod := buildPod().name("a").job(testJob).deletionTimestamp().trackingFinalizer().Pod - orphanPod, err := clientset.CoreV1().Pods("default").Create(context.Background(), orphanPod, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("Creating orphan pod: %v", err) + // Create job but don't add it to the store. + cases := map[string]struct { + job *batch.Job + inCache bool + wantJobSynced bool + }{ + "job_does_not_exist": { + job: newJob(2, 2, 6, batch.NonIndexedCompletion), + }, + "orphan": {}, + "job_finished": { + job: func() *batch.Job { + j := newJob(2, 2, 6, batch.NonIndexedCompletion) + j.Status.Conditions = append(j.Status.Conditions, batch.JobCondition{ + Type: batch.JobComplete, + Status: v1.ConditionTrue, + }) + return j + }(), + inCache: true, + }, } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + jobSynced = false + if tc.inCache { + sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job) + } - if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { - p, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(context.Background(), orphanPod.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - return !hasJobTrackingFinalizer(p), nil - }); err != nil { - t.Errorf("Waiting for Pod to get the finalizer removed: %v", err) - } - if jobSynced { - t.Error("Tried to sync deleted job") + podBuilder := buildPod().name(name).deletionTimestamp().trackingFinalizer() + if tc.job != nil { + podBuilder = podBuilder.job(tc.job) + } + orphanPod := podBuilder.Pod + orphanPod, err := clientset.CoreV1().Pods("default").Create(context.Background(), orphanPod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Creating orphan pod: %v", err) + } + + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + p, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(context.Background(), orphanPod.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return !hasJobTrackingFinalizer(p), nil + }); err != nil { + t.Errorf("Waiting for Pod to get the finalizer removed: %v", err) + } + if !tc.inCache && jobSynced { + t.Error("Tried to sync deleted job") + } + }) } } From 12568860cb2b352b3bf4391d994fd8266c72d2f2 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 20 Apr 2022 16:38:17 -0400 Subject: [PATCH 4/4] Test Foreground deletion in job integration Change-Id: Ia6e86da5e66422fdb653c1ee60864a1c79233ea6 --- test/integration/job/job_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 593f37e91f1..02ce01d552b 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -535,7 +535,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) { func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)() - for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground} { + for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground, metav1.DeletePropagationForeground} { t.Run(string(policy), func(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn()