From 6dfaeff33cbd90562a4984244ae4bb8b3ce52a2f Mon Sep 17 00:00:00 2001 From: kannon92 Date: Wed, 21 Dec 2022 22:11:27 +0000 Subject: [PATCH] Remove Legacy Job Tracking --- pkg/controller/job/indexed_job_utils.go | 8 +- pkg/controller/job/indexed_job_utils_test.go | 62 +- pkg/controller/job/job_controller.go | 153 +---- pkg/controller/job/job_controller_test.go | 622 +++++++------------ test/integration/job/job_test.go | 31 +- 5 files changed, 284 insertions(+), 592 deletions(-) diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index 9866e06941d..11ee7e08745 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -52,17 +52,13 @@ type orderedIntervals []interval // empty list if this Job is not tracked with finalizers. The new list includes // the indexes that succeeded since the last sync. func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) { - var prevIntervals orderedIntervals - withFinalizers := hasJobTrackingAnnotation(job) - if withFinalizers { - prevIntervals = succeededIndexesFromJob(job) - } + prevIntervals := succeededIndexesFromJob(job) newSucceeded := sets.NewInt() for _, p := range pods { ix := getCompletionIndex(p.Annotations) // Succeeded Pod with valid index and, if tracking with finalizers, // has a finalizer (meaning that it is not counted yet). - if p.Status.Phase == v1.PodSucceeded && ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && (!withFinalizers || hasJobTrackingFinalizer(p)) { + if p.Status.Phase == v1.PodSucceeded && ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && hasJobTrackingFinalizer(p) { newSucceeded.Insert(ix) } } diff --git a/pkg/controller/job/indexed_job_utils_test.go b/pkg/controller/job/indexed_job_utils_test.go index 519bf3d7dc3..0b64ce5cd74 100644 --- a/pkg/controller/job/indexed_job_utils_test.go +++ b/pkg/controller/job/indexed_job_utils_test.go @@ -29,12 +29,11 @@ const noIndex = "-" func TestCalculateSucceededIndexes(t *testing.T) { cases := map[string]struct { - prevSucceeded string - pods []indexPhase - completions int32 - trackingWithFinalizers bool - wantStatusIntervals orderedIntervals - wantIntervals orderedIntervals + prevSucceeded string + pods []indexPhase + completions int32 + wantStatusIntervals orderedIntervals + wantIntervals orderedIntervals }{ "one index": { pods: []indexPhase{{"1", v1.PodSucceeded}}, @@ -65,19 +64,6 @@ func TestCalculateSucceededIndexes(t *testing.T) { completions: 8, wantIntervals: []interval{{2, 3}, {5, 7}}, }, - "one interval, ignore previous": { - prevSucceeded: "3-5", - pods: []indexPhase{ - {"0", v1.PodSucceeded}, - {"1", v1.PodFailed}, - {"1", v1.PodSucceeded}, - {"2", v1.PodSucceeded}, - {"2", v1.PodSucceeded}, - {"3", v1.PodFailed}, - }, - completions: 4, - wantIntervals: []interval{{0, 2}}, - }, "one index and one interval": { pods: []indexPhase{ {"0", v1.PodSucceeded}, @@ -107,18 +93,16 @@ func TestCalculateSucceededIndexes(t *testing.T) { wantIntervals: []interval{{0, 2}, {4, 4}}, }, "prev interval out of range": { - prevSucceeded: "0-5,8-10", - completions: 8, - trackingWithFinalizers: true, - wantStatusIntervals: []interval{{0, 5}}, - wantIntervals: []interval{{0, 5}}, + prevSucceeded: "0-5,8-10", + completions: 8, + wantStatusIntervals: []interval{{0, 5}}, + wantIntervals: []interval{{0, 5}}, }, "prev interval partially out of range": { - prevSucceeded: "0-5,8-10", - completions: 10, - trackingWithFinalizers: true, - wantStatusIntervals: []interval{{0, 5}, {8, 9}}, - wantIntervals: []interval{{0, 5}, {8, 9}}, + prevSucceeded: "0-5,8-10", + completions: 10, + wantStatusIntervals: []interval{{0, 5}, {8, 9}}, + wantIntervals: []interval{{0, 5}, {8, 9}}, }, "prev and new separate": { prevSucceeded: "0,4,5,10-12", @@ -127,8 +111,7 @@ func TestCalculateSucceededIndexes(t *testing.T) { {"7", v1.PodSucceeded}, {"8", v1.PodSucceeded}, }, - completions: 13, - trackingWithFinalizers: true, + completions: 13, wantStatusIntervals: []interval{ {0, 0}, {4, 5}, @@ -149,8 +132,7 @@ func TestCalculateSucceededIndexes(t *testing.T) { {"7", v1.PodSucceeded}, {"8", v1.PodSucceeded}, }, - completions: 9, - trackingWithFinalizers: true, + completions: 9, wantStatusIntervals: []interval{ {3, 4}, {6, 6}, @@ -167,8 +149,7 @@ func TestCalculateSucceededIndexes(t *testing.T) { {"4", v1.PodSucceeded}, {"6", v1.PodSucceeded}, }, - completions: 9, - trackingWithFinalizers: true, + completions: 9, wantStatusIntervals: []interval{ {2, 2}, {7, 8}, @@ -186,8 +167,7 @@ func TestCalculateSucceededIndexes(t *testing.T) { {"5", v1.PodSucceeded}, {"9", v1.PodSucceeded}, }, - completions: 10, - trackingWithFinalizers: true, + completions: 10, wantStatusIntervals: []interval{ {2, 7}, }, @@ -202,8 +182,7 @@ func TestCalculateSucceededIndexes(t *testing.T) { pods: []indexPhase{ {"3", v1.PodSucceeded}, }, - completions: 4, - trackingWithFinalizers: true, + completions: 4, wantStatusIntervals: []interval{ {0, 0}, }, @@ -223,11 +202,6 @@ func TestCalculateSucceededIndexes(t *testing.T) { Completions: pointer.Int32(tc.completions), }, } - if tc.trackingWithFinalizers { - job.Annotations = map[string]string{ - batch.JobTrackingFinalizer: "", - } - } pods := hollowPodsWithIndexPhase(tc.pods) for _, p := range pods { p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index ad6b590ab37..6c7fd03ed24 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -614,9 +614,9 @@ func (jm Controller) syncOrphanPod(ctx context.Context, key string) error { // getPodsForJob returns the set of pods that this Job should manage. // It also reconciles ControllerRef by adopting/orphaning, adding tracking -// finalizers, if enabled. +// finalizers. // Note that the returned Pods are pointers into the cache. -func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job, withFinalizers bool) ([]*v1.Pod, error) { +func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Pod, error) { selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector) if err != nil { return nil, fmt.Errorf("couldn't convert Job selector: %v", err) @@ -639,14 +639,10 @@ func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job, withFinal } return fresh, nil }) - var finalizers []string - if withFinalizers { - finalizers = append(finalizers, batch.JobTrackingFinalizer) - } - cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, finalizers...) + cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, batch.JobTrackingFinalizer) // When adopting Pods, this operation adds an ownerRef and finalizers. pods, err = cm.ClaimPods(ctx, pods) - if err != nil || !withFinalizers { + if err != nil { return pods, err } // Set finalizer on adopted pods for the remaining calculations. @@ -718,23 +714,18 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc() }() - var expectedRmFinalizers sets.String - var uncounted *uncountedTerminatedPods - if hasJobTrackingAnnotation(&job) { - klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job)) - if job.Status.UncountedTerminatedPods == nil { - job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} - } - uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) - expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key) + if job.Status.UncountedTerminatedPods == nil { + job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} } + uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) + expectedRmFinalizers := jm.finalizerExpectations.getExpectedUIDs(key) // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the job sync is just deferred till the next relist. satisfiedExpectations := jm.expectations.SatisfiedExpectations(key) - pods, err := jm.getPodsForJob(ctx, &job, uncounted != nil) + pods, err := jm.getPodsForJob(ctx, &job) if err != nil { return false, err } @@ -767,13 +758,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil { finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now()) } else if failJobMessage := getFailJobMessage(&job, pods, uncounted.Failed()); failJobMessage != nil { - if uncounted != nil { - // Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed. - finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now()) - } else { - // Prepare the Failed job condition for the legacy path without finalizers (don't use the interim FailureTarget condition). - finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now()) - } + // Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed. + finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now()) } } if finishedCondition == nil { @@ -799,16 +785,12 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr // Remove active pods if Job failed. if finishedCondition != nil { deleted, err := jm.deleteActivePods(ctx, &job, activePods) - if uncounted == nil { - // Legacy behavior: pretend all active pods were successfully removed. - deleted = active - } else if deleted != active || !satisfiedExpectations { + if deleted != active || !satisfiedExpectations { // Can't declare the Job as finished yet, as there might be remaining // pod finalizers or pods that are not in the informer's cache yet. finishedCondition = nil } active -= deleted - failed += deleted manageJobErr = err } else { manageJobCalled := false @@ -872,56 +854,19 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr // In this case, we should clear the backoff delay. forget = job.Status.Succeeded < succeeded - if uncounted != nil { - needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready) - job.Status.Active = active - job.Status.Ready = ready - err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate) - if err != nil { - return false, fmt.Errorf("tracking status: %w", err) - } - jobFinished := IsJobFinished(&job) - if jobHasNewFailure && !jobFinished { - // returning an error will re-enqueue Job after the backoff period - return forget, fmt.Errorf("failed pod(s) detected for job key %q", key) - } - forget = true - return forget, manageJobErr + needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready) + job.Status.Active = active + job.Status.Ready = ready + err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate) + if err != nil { + return false, fmt.Errorf("tracking status: %w", err) } - // Legacy path: tracking without finalizers. - - // Ensure that there are no leftover tracking finalizers. - if err := jm.removeTrackingFinalizersFromAllPods(ctx, pods); err != nil { - return false, fmt.Errorf("removing disabled finalizers from job pods %s: %w", key, err) + jobFinished := IsJobFinished(&job) + if jobHasNewFailure && !jobFinished { + // returning an error will re-enqueue Job after the backoff period + return forget, fmt.Errorf("failed pod(s) detected for job key %q", key) } - - // no need to update the job if the status hasn't changed since last time - if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || !equalReady(job.Status.Ready, ready) || suspendCondChanged || finishedCondition != nil { - job.Status.Active = active - job.Status.Succeeded = succeeded - job.Status.Failed = failed - job.Status.Ready = ready - if isIndexedJob(&job) { - job.Status.CompletedIndexes = succeededIndexes.String() - } - job.Status.UncountedTerminatedPods = nil - jobFinished := jm.enactJobFinished(&job, finishedCondition) - - if _, err := jm.updateStatusHandler(ctx, &job); err != nil { - return forget, err - } - if jobFinished { - jm.recordJobFinished(&job, finishedCondition) - } - - if jobHasNewFailure && !IsJobFinished(&job) { - // returning an error will re-enqueue Job after the backoff period - return forget, fmt.Errorf("failed pod(s) detected for job key %q", key) - } - - forget = true - } - + forget = true return forget, manageJobErr } @@ -986,23 +931,6 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey return successfulDeletes, errorFromChannel(errCh) } -// removeTrackingFinalizersFromAllPods removes finalizers from any Job Pod. This is called -// when Job tracking with finalizers is disabled. -func (jm *Controller) removeTrackingFinalizersFromAllPods(ctx context.Context, pods []*v1.Pod) error { - var podsWithFinalizer []*v1.Pod - for _, pod := range pods { - if hasJobTrackingFinalizer(pod) { - podsWithFinalizer = append(podsWithFinalizer, pod) - } - } - if len(podsWithFinalizer) == 0 { - return nil - } - // Tracking with finalizers is disabled, no need to set expectations. - _, err := jm.removeTrackingFinalizerFromPods(ctx, "", podsWithFinalizer) - return err -} - // trackJobStatusAndRemoveFinalizers does: // 1. Add finished Pods to .status.uncountedTerminatedPods // 2. Remove the finalizers from the Pods if they completed or were removed @@ -1053,7 +981,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job // considerTerminated || job.DeletionTimestamp != nil considerTerminated = podutil.IsPodTerminal(pod) || finishedCond != nil || // The Job is terminating. Any running Pod is considered failed. - isPodFailed(pod, job, true /* using finalizers */) + isPodFailed(pod, job) } if podutil.IsPodTerminal(pod) || considerTerminated || job.DeletionTimestamp != nil { podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod) @@ -1361,7 +1289,7 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *s return nil } for _, p := range pods { - if isPodFailed(p, job, uncounted != nil) { + if isPodFailed(p, job) { jobFailureMessage, _, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p) if jobFailureMessage != nil { return jobFailureMessage @@ -1374,22 +1302,20 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *s // getStatus returns number of succeeded and failed pods running a job. The number // of failed pods can be affected by the podFailurePolicy. func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.String) (succeeded, failed int32) { - if uncounted != nil { - succeeded = job.Status.Succeeded - failed = job.Status.Failed - } + succeeded = job.Status.Succeeded + failed = job.Status.Failed succeeded += int32(countValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool { return p.Status.Phase == v1.PodSucceeded })) failed += int32(countValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { - if !isPodFailed(p, job, uncounted != nil) { + if !isPodFailed(p, job) { return false } _, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p) return countFailed } else { - return isPodFailed(p, job, uncounted != nil) + return isPodFailed(p, job) } })) return succeeded, failed @@ -1487,9 +1413,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods if isIndexedJob(job) { addCompletionIndexEnvVariables(podTemplate) } - if hasJobTrackingAnnotation(job) { - podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers) - } + podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers) // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // and double with each successful iteration in a kind of "slow start". @@ -1646,14 +1570,6 @@ func getCompletionMode(job *batch.Job) string { return string(batch.NonIndexedCompletion) } -func hasJobTrackingAnnotation(job *batch.Job) bool { - if job.Annotations == nil { - return false - } - _, ok := job.Annotations[batch.JobTrackingFinalizer] - return ok -} - func appendJobCompletionFinalizerIfNotFound(finalizers []string) []string { for _, fin := range finalizers { if fin == batch.JobTrackingFinalizer { @@ -1739,7 +1655,7 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio return list, false } -func isPodFailed(p *v1.Pod, job *batch.Job, wFinalizers bool) bool { +func isPodFailed(p *v1.Pod, job *batch.Job) bool { if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) && feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { // When PodDisruptionConditions is enabled, orphan Pods and unschedulable // terminating Pods are marked as Failed. So we only need to check the phase. @@ -1751,10 +1667,9 @@ func isPodFailed(p *v1.Pod, job *batch.Job, wFinalizers bool) bool { if p.Status.Phase == v1.PodFailed { return true } - // When tracking with finalizers: counting deleted Pods as failures to - // account for orphan Pods that never have a chance to reach the Failed - // phase. - return wFinalizers && p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded + // Count deleted Pods as failures to account for orphan Pods that + // never have a chance to reach the Failed phase. + return p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded } func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType) *batch.JobCondition { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index a341f3ff1db..686a2979f35 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -141,9 +141,7 @@ func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod { for i := 0; i < count; i++ { newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) newPod.Status = v1.PodStatus{Phase: status} - if hasJobTrackingAnnotation(job) { - newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer) - } + newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer) pods = append(pods, newPod) } return pods @@ -184,9 +182,7 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status } p.Spec.Hostname = fmt.Sprintf("%s-%s", job.Name, s.Index) } - if hasJobTrackingAnnotation(job) { - p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer) - } + p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer) podIndexer.Add(p) } } @@ -207,9 +203,6 @@ func TestControllerSyncJob(t *testing.T) { wasSuspended bool suspend bool - // If set, it means that the case is exclusive to tracking with/without finalizers. - wFinalizersExclusive *bool - // pod setup podControllerError error jobKeyForget bool @@ -495,7 +488,6 @@ func TestControllerSyncJob(t *testing.T) { expectedPodPatches: 1, }, "job failures, unsatisfied expectations": { - wFinalizersExclusive: pointer.Bool(true), parallelism: 2, completions: 5, deleting: true, @@ -725,175 +717,161 @@ func TestControllerSyncJob(t *testing.T) { } for name, tc := range testCases { - for _, wFinalizers := range []bool{false, true} { - t.Run(fmt.Sprintf("%s, finalizers=%t", name, wFinalizers), func(t *testing.T) { - if wFinalizers && tc.podControllerError != nil { - t.Skip("Can't track status if finalizers can't be removed") - } - if tc.wFinalizersExclusive != nil && *tc.wFinalizersExclusive != wFinalizers { - t.Skipf("Test is exclusive for wFinalizers=%t", *tc.wFinalizersExclusive) - } - defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)() + t.Run(name, func(t *testing.T) { + if tc.podControllerError != nil { + t.Skip("Can't track status if finalizers can't be removed") + } + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)() - // job manager setup - clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) - fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit} - manager.podControl = &fakePodControl - manager.podStoreSynced = alwaysReady - manager.jobStoreSynced = alwaysReady + // job manager setup + clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) + fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit} + manager.podControl = &fakePodControl + manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady - // job & pods setup - job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode) - job.Spec.Suspend = pointer.Bool(tc.suspend) - key, err := controller.KeyFunc(job) - if err != nil { - t.Errorf("Unexpected error getting job key: %v", err) - } - if tc.fakeExpectationAtCreation < 0 { - manager.expectations.ExpectDeletions(key, int(-tc.fakeExpectationAtCreation)) - } else if tc.fakeExpectationAtCreation > 0 { - manager.expectations.ExpectCreations(key, int(tc.fakeExpectationAtCreation)) - } - if tc.wasSuspended { - job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", realClock.Now())) - } - if wFinalizers { - job.Annotations = map[string]string{ - batch.JobTrackingFinalizer: "", - } - } - if tc.deleting { - now := metav1.Now() - job.DeletionTimestamp = &now - } - sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() - setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods, tc.readyPods) - setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes) + // job & pods setup + job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode) + job.Spec.Suspend = pointer.Bool(tc.suspend) + key, err := controller.KeyFunc(job) + if err != nil { + t.Errorf("Unexpected error getting job key: %v", err) + } + if tc.fakeExpectationAtCreation < 0 { + manager.expectations.ExpectDeletions(key, int(-tc.fakeExpectationAtCreation)) + } else if tc.fakeExpectationAtCreation > 0 { + manager.expectations.ExpectCreations(key, int(tc.fakeExpectationAtCreation)) + } + if tc.wasSuspended { + job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", realClock.Now())) + } + if tc.deleting { + now := metav1.Now() + job.DeletionTimestamp = &now + } + sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) + podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() + setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods, tc.readyPods) + setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes) - actual := job - manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { - actual = job - return job, nil - } + actual := job + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { + actual = job + return job, nil + } - // run - forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) + // run + forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) - // We need requeue syncJob task if podController error - if tc.podControllerError != nil { - if err == nil { - t.Error("Syncing jobs expected to return error on podControl exception") - } - } else if tc.expectedCondition == nil && (hasValidFailingPods(tc.podsWithIndexes, int(tc.completions)) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) { - if err == nil { - t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish") - } - } else if tc.podLimit != 0 && fakePodControl.CreateCallCount > tc.podLimit { - if err == nil { - t.Error("Syncing jobs expected to return error when reached the podControl limit") - } - } else if err != nil { - t.Errorf("Unexpected error when syncing jobs: %v", err) + // We need requeue syncJob task if podController error + if tc.podControllerError != nil { + if err == nil { + t.Error("Syncing jobs expected to return error on podControl exception") } - if forget != tc.jobKeyForget { - t.Errorf("Unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget) + } else if tc.expectedCondition == nil && (hasValidFailingPods(tc.podsWithIndexes, int(tc.completions)) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) { + if err == nil { + t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish") } - // validate created/deleted pods - if int32(len(fakePodControl.Templates)) != tc.expectedCreations { - t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates)) + } else if tc.podLimit != 0 && fakePodControl.CreateCallCount > tc.podLimit { + if err == nil { + t.Error("Syncing jobs expected to return error when reached the podControl limit") } - if tc.completionMode == batch.IndexedCompletion { - checkIndexedJobPods(t, &fakePodControl, tc.expectedCreatedIndexes, job.Name) - } else { - for _, p := range fakePodControl.Templates { - // Fake pod control doesn't add generate name from the owner reference. - if p.GenerateName != "" { - t.Errorf("Got pod generate name %s, want %s", p.GenerateName, "") - } - if p.Spec.Hostname != "" { - t.Errorf("Got pod hostname %q, want none", p.Spec.Hostname) - } + } else if err != nil { + t.Errorf("Unexpected error when syncing jobs: %v", err) + } + if forget != tc.jobKeyForget { + t.Errorf("Unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget) + } + // validate created/deleted pods + if int32(len(fakePodControl.Templates)) != tc.expectedCreations { + t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates)) + } + if tc.completionMode == batch.IndexedCompletion { + checkIndexedJobPods(t, &fakePodControl, tc.expectedCreatedIndexes, job.Name) + } else { + for _, p := range fakePodControl.Templates { + // Fake pod control doesn't add generate name from the owner reference. + if p.GenerateName != "" { + t.Errorf("Got pod generate name %s, want %s", p.GenerateName, "") + } + if p.Spec.Hostname != "" { + t.Errorf("Got pod hostname %q, want none", p.Spec.Hostname) } } - if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions { - t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName)) + } + if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions { + t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName)) + } + // Each create should have an accompanying ControllerRef. + if len(fakePodControl.ControllerRefs) != int(tc.expectedCreations) { + t.Errorf("Unexpected number of ControllerRefs. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.ControllerRefs)) + } + // Make sure the ControllerRefs are correct. + for _, controllerRef := range fakePodControl.ControllerRefs { + if got, want := controllerRef.APIVersion, "batch/v1"; got != want { + t.Errorf("controllerRef.APIVersion = %q, want %q", got, want) } - // Each create should have an accompanying ControllerRef. - if len(fakePodControl.ControllerRefs) != int(tc.expectedCreations) { - t.Errorf("Unexpected number of ControllerRefs. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.ControllerRefs)) + if got, want := controllerRef.Kind, "Job"; got != want { + t.Errorf("controllerRef.Kind = %q, want %q", got, want) } - // Make sure the ControllerRefs are correct. - for _, controllerRef := range fakePodControl.ControllerRefs { - if got, want := controllerRef.APIVersion, "batch/v1"; got != want { - t.Errorf("controllerRef.APIVersion = %q, want %q", got, want) - } - if got, want := controllerRef.Kind, "Job"; got != want { - t.Errorf("controllerRef.Kind = %q, want %q", got, want) - } - if got, want := controllerRef.Name, job.Name; got != want { - t.Errorf("controllerRef.Name = %q, want %q", got, want) - } - if got, want := controllerRef.UID, job.UID; got != want { - t.Errorf("controllerRef.UID = %q, want %q", got, want) - } - if controllerRef.Controller == nil || *controllerRef.Controller != true { - t.Errorf("controllerRef.Controller is not set to true") - } + if got, want := controllerRef.Name, job.Name; got != want { + t.Errorf("controllerRef.Name = %q, want %q", got, want) } - // validate status - if actual.Status.Active != tc.expectedActive { - t.Errorf("Unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) + if got, want := controllerRef.UID, job.UID; got != want { + t.Errorf("controllerRef.UID = %q, want %q", got, want) } - if diff := cmp.Diff(tc.expectedReady, actual.Status.Ready); diff != "" { - t.Errorf("Unexpected number of ready pods (-want,+got): %s", diff) + if controllerRef.Controller == nil || *controllerRef.Controller != true { + t.Errorf("controllerRef.Controller is not set to true") } - if actual.Status.Succeeded != tc.expectedSucceeded { - t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) + } + // validate status + if actual.Status.Active != tc.expectedActive { + t.Errorf("Unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) + } + if diff := cmp.Diff(tc.expectedReady, actual.Status.Ready); diff != "" { + t.Errorf("Unexpected number of ready pods (-want,+got): %s", diff) + } + if actual.Status.Succeeded != tc.expectedSucceeded { + t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) + } + if diff := cmp.Diff(tc.expectedCompletedIdxs, actual.Status.CompletedIndexes); diff != "" { + t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff) + } + if actual.Status.Failed != tc.expectedFailed { + t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) + } + if actual.Status.StartTime != nil && tc.suspend { + t.Error("Unexpected .status.startTime not nil when suspend is true") + } + if actual.Status.StartTime == nil && !tc.suspend { + t.Error("Missing .status.startTime") + } + // validate conditions + if tc.expectedCondition != nil { + if !getCondition(actual, *tc.expectedCondition, tc.expectedConditionStatus, tc.expectedConditionReason) { + t.Errorf("Expected completion condition. Got %#v", actual.Status.Conditions) } - if diff := cmp.Diff(tc.expectedCompletedIdxs, actual.Status.CompletedIndexes); diff != "" { - t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff) + } else { + if cond := hasTrueCondition(actual); cond != nil { + t.Errorf("Got condition %s, want none", *cond) } - if actual.Status.Failed != tc.expectedFailed { - t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) - } - if actual.Status.StartTime != nil && tc.suspend { - t.Error("Unexpected .status.startTime not nil when suspend is true") - } - if actual.Status.StartTime == nil && !tc.suspend { - t.Error("Missing .status.startTime") - } - // validate conditions - if tc.expectedCondition != nil { - if !getCondition(actual, *tc.expectedCondition, tc.expectedConditionStatus, tc.expectedConditionReason) { - t.Errorf("Expected completion condition. Got %#v", actual.Status.Conditions) - } - } else { - if cond := hasTrueCondition(actual); cond != nil { - t.Errorf("Got condition %s, want none", *cond) - } - } - if tc.expectedCondition == nil && tc.suspend && len(actual.Status.Conditions) != 0 { - t.Errorf("Unexpected conditions %v", actual.Status.Conditions) - } - // validate slow start - expectedLimit := 0 - for pass := uint8(0); expectedLimit <= tc.podLimit; pass++ { - expectedLimit += controller.SlowStartInitialBatchSize << pass - } - if tc.podLimit > 0 && fakePodControl.CreateCallCount > expectedLimit { - t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount) - } - wantPodPatches := 0 - if wFinalizers { - wantPodPatches = tc.expectedPodPatches - } - if p := len(fakePodControl.Patches); p != wantPodPatches { - t.Errorf("Got %d pod patches, want %d", p, wantPodPatches) - } - }) - } + } + if tc.expectedCondition == nil && tc.suspend && len(actual.Status.Conditions) != 0 { + t.Errorf("Unexpected conditions %v", actual.Status.Conditions) + } + // validate slow start + expectedLimit := 0 + for pass := uint8(0); expectedLimit <= tc.podLimit; pass++ { + expectedLimit += controller.SlowStartInitialBatchSize << pass + } + if tc.podLimit > 0 && fakePodControl.CreateCallCount > expectedLimit { + t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount) + } + if p := len(fakePodControl.Patches); p != tc.expectedPodPatches { + t.Errorf("Got %d pod patches, want %d", p, tc.expectedPodPatches) + } + }) } } @@ -922,94 +900,6 @@ func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantI } } -// TestSyncJobLegacyTracking makes sure that a Job is only tracked with -// finalizers when the job has the annotation. -func TestSyncJobLegacyTracking(t *testing.T) { - cases := map[string]struct { - job batch.Job - wantUncounted bool - wantPatches int - }{ - "no annotation": { - job: batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "ns", - }, - Spec: batch.JobSpec{ - Parallelism: pointer.Int32(1), - }, - }, - }, - "tracking annotation": { - job: batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "ns", - Annotations: map[string]string{ - batch.JobTrackingFinalizer: "", - }, - }, - Spec: batch.JobSpec{ - Parallelism: pointer.Int32(1), - }, - }, - wantUncounted: true, - }, - "different annotation": { - job: batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "ns", - Annotations: map[string]string{ - "foo": "bar", - }, - }, - Spec: batch.JobSpec{ - Parallelism: pointer.Int32(1), - }, - }, - }, - } - for name, tc := range cases { - t.Run(name, func(t *testing.T) { - // Job manager setup. - clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) - fakePodControl := controller.FakePodControl{} - manager.podControl = &fakePodControl - manager.podStoreSynced = alwaysReady - manager.jobStoreSynced = alwaysReady - jobPatches := 0 - manager.patchJobHandler = func(context.Context, *batch.Job, []byte) error { - jobPatches++ - return nil - } - sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(&tc.job) - - var actual *batch.Job - manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { - actual = job - return job, nil - } - - // Run. - _, err := manager.syncJob(context.TODO(), testutil.GetKey(&tc.job, t)) - if err != nil { - t.Fatalf("Syncing job: %v", err) - } - - // Checks. - if got := actual.Status.UncountedTerminatedPods != nil; got != tc.wantUncounted { - t.Errorf("Job got uncounted pods %t, want %t", got, tc.wantUncounted) - } - if jobPatches != tc.wantPatches { - t.Errorf("Sync did %d patches, want %d", jobPatches, tc.wantPatches) - } - }) - } -} - func TestGetStatus(t *testing.T) { cases := map[string]struct { job batch.Job @@ -1018,24 +908,6 @@ func TestGetStatus(t *testing.T) { wantSucceeded int32 wantFailed int32 }{ - "without finalizers": { - job: batch.Job{ - Status: batch.JobStatus{ - Succeeded: 1, - Failed: 2, - }, - }, - pods: []*v1.Pod{ - buildPod().uid("a").phase(v1.PodSucceeded).Pod, - buildPod().uid("b").phase(v1.PodSucceeded).Pod, - buildPod().uid("c").phase(v1.PodFailed).Pod, - buildPod().uid("d").phase(v1.PodFailed).Pod, - buildPod().uid("e").phase(v1.PodFailed).Pod, - buildPod().uid("f").phase(v1.PodRunning).Pod, - }, - wantSucceeded: 2, - wantFailed: 3, - }, "some counted": { job: batch.Job{ Status: batch.JobStatus{ @@ -1103,16 +975,6 @@ func TestGetStatus(t *testing.T) { wantFailed: 5, }, "deleted pods": { - pods: []*v1.Pod{ - buildPod().uid("a").phase(v1.PodSucceeded).deletionTimestamp().Pod, - buildPod().uid("b").phase(v1.PodFailed).deletionTimestamp().Pod, - buildPod().uid("c").phase(v1.PodRunning).deletionTimestamp().Pod, - buildPod().uid("d").phase(v1.PodPending).deletionTimestamp().Pod, - }, - wantSucceeded: 1, - wantFailed: 1, - }, - "deleted pods, tracking with finalizers": { job: batch.Job{ Status: batch.JobStatus{ Succeeded: 1, @@ -1134,10 +996,7 @@ func TestGetStatus(t *testing.T) { } for name, tc := range cases { t.Run(name, func(t *testing.T) { - var uncounted *uncountedTerminatedPods - if tc.job.Status.UncountedTerminatedPods != nil { - uncounted = newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) - } + uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) succeeded, failed := getStatus(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers) if succeeded != tc.wantSucceeded { t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded) @@ -1869,9 +1728,6 @@ func TestPastDeadlineJobFinished(t *testing.T) { start := metav1.NewTime(fakeClock.Now()) job.Status.StartTime = &start } - job.Annotations = map[string]string{ - batch.JobTrackingFinalizer: "", - } _, err := clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{}) if err != nil { @@ -2063,7 +1919,6 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { } testCases := map[string]struct { - wFinalizersExclusive *bool enableJobPodFailurePolicy bool enablePodDisruptionConditions bool job batch.Job @@ -2949,7 +2804,6 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { wantStatusSucceeded: 0, }, "terminating Pod considered failed when PodDisruptionConditions is disabled": { - wFinalizersExclusive: pointer.Bool(true), enableJobPodFailurePolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, @@ -3030,76 +2884,63 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { wantStatusActive: 1, // This is a replacement Pod: the terminating Pod is neither active nor failed. }, } - for _, wFinalizers := range []bool{false, true} { - for name, tc := range testCases { - t.Run(fmt.Sprintf("%s; finalizers=%t", name, wFinalizers), func(t *testing.T) { - if tc.wFinalizersExclusive != nil && *tc.wFinalizersExclusive != wFinalizers { - t.Skipf("Test is exclusive for wFinalizers=%t", *tc.wFinalizersExclusive) - } - defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() - defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)() - clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) - fakePodControl := controller.FakePodControl{} - manager.podControl = &fakePodControl - manager.podStoreSynced = alwaysReady - manager.jobStoreSynced = alwaysReady - job := &tc.job + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)() + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady + job := &tc.job - if wFinalizers { - job.Annotations = map[string]string{ - batch.JobTrackingFinalizer: "", - } + actual := job + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { + actual = job + return job, nil + } + sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) + for i, pod := range tc.pods { + pod := pod + pb := podBuilder{Pod: &pod}.name(fmt.Sprintf("mypod-%d", i)).job(job) + if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion { + pb.index(fmt.Sprintf("%v", i)) } + pb = pb.trackingFinalizer() + sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod) + } - actual := job - manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { - actual = job - return job, nil - } - sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - for i, pod := range tc.pods { - pod := pod - pb := podBuilder{Pod: &pod}.name(fmt.Sprintf("mypod-%d", i)).job(job) - if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion { - pb.index(fmt.Sprintf("%v", i)) - } - if wFinalizers { - pb.trackingFinalizer() - } - sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod) - } + manager.syncJob(context.TODO(), testutil.GetKey(job, t)) - manager.syncJob(context.TODO(), testutil.GetKey(job, t)) - - if tc.wantConditions != nil { - for _, wantCondition := range *tc.wantConditions { - conditions := getConditionsByType(actual.Status.Conditions, wantCondition.Type) - if len(conditions) != 1 { - t.Fatalf("Expected a single completion condition. Got %#v for type: %q", conditions, wantCondition.Type) - } - condition := *conditions[0] - if diff := cmp.Diff(wantCondition, condition, cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { - t.Errorf("Unexpected job condition (-want,+got):\n%s", diff) - } + if tc.wantConditions != nil { + for _, wantCondition := range *tc.wantConditions { + conditions := getConditionsByType(actual.Status.Conditions, wantCondition.Type) + if len(conditions) != 1 { + t.Fatalf("Expected a single completion condition. Got %#v for type: %q", conditions, wantCondition.Type) } - } else { - if cond := hasTrueCondition(actual); cond != nil { - t.Errorf("Got condition %s, want none", *cond) + condition := *conditions[0] + if diff := cmp.Diff(wantCondition, condition, cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { + t.Errorf("Unexpected job condition (-want,+got):\n%s", diff) } } - // validate status - if actual.Status.Active != tc.wantStatusActive { - t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.wantStatusActive, actual.Status.Active) + } else { + if cond := hasTrueCondition(actual); cond != nil { + t.Errorf("Got condition %s, want none", *cond) } - if actual.Status.Succeeded != tc.wantStatusSucceeded { - t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.wantStatusSucceeded, actual.Status.Succeeded) - } - if actual.Status.Failed != tc.wantStatusFailed { - t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.wantStatusFailed, actual.Status.Failed) - } - }) - } + } + // validate status + if actual.Status.Active != tc.wantStatusActive { + t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.wantStatusActive, actual.Status.Active) + } + if actual.Status.Succeeded != tc.wantStatusSucceeded { + t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.wantStatusSucceeded, actual.Status.Succeeded) + } + if actual.Status.Failed != tc.wantStatusFailed { + t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.wantStatusFailed, actual.Status.Failed) + } + }) } } @@ -3291,50 +3132,46 @@ func TestGetPodsForJob(t *testing.T) { }, } for name, tc := range cases { - for _, wFinalizers := range []bool{false, true} { - t.Run(fmt.Sprintf("%s, finalizers=%t", name, wFinalizers), func(t *testing.T) { - job := job.DeepCopy() - if tc.jobDeleted { - job.DeletionTimestamp = &metav1.Time{} - } - clientSet := fake.NewSimpleClientset(job, otherJob) - jm, informer := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) - jm.podStoreSynced = alwaysReady - jm.jobStoreSynced = alwaysReady - cachedJob := job.DeepCopy() - if tc.jobDeletedInCache { - cachedJob.DeletionTimestamp = &metav1.Time{} - } - informer.Batch().V1().Jobs().Informer().GetIndexer().Add(cachedJob) - informer.Batch().V1().Jobs().Informer().GetIndexer().Add(otherJob) - for _, p := range tc.pods { - informer.Core().V1().Pods().Informer().GetIndexer().Add(p) - } + t.Run(name, func(t *testing.T) { + job := job.DeepCopy() + if tc.jobDeleted { + job.DeletionTimestamp = &metav1.Time{} + } + clientSet := fake.NewSimpleClientset(job, otherJob) + jm, informer := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) + jm.podStoreSynced = alwaysReady + jm.jobStoreSynced = alwaysReady + cachedJob := job.DeepCopy() + if tc.jobDeletedInCache { + cachedJob.DeletionTimestamp = &metav1.Time{} + } + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(cachedJob) + informer.Batch().V1().Jobs().Informer().GetIndexer().Add(otherJob) + for _, p := range tc.pods { + informer.Core().V1().Pods().Informer().GetIndexer().Add(p) + } - pods, err := jm.getPodsForJob(context.TODO(), job, wFinalizers) - if err != nil { - t.Fatalf("getPodsForJob() error: %v", err) + pods, err := jm.getPodsForJob(context.TODO(), job) + if err != nil { + t.Fatalf("getPodsForJob() error: %v", err) + } + got := make([]string, len(pods)) + var gotFinalizer []string + for i, p := range pods { + got[i] = p.Name + if hasJobTrackingFinalizer(p) { + gotFinalizer = append(gotFinalizer, p.Name) } - got := make([]string, len(pods)) - var gotFinalizer []string - for i, p := range pods { - got[i] = p.Name - if hasJobTrackingFinalizer(p) { - gotFinalizer = append(gotFinalizer, p.Name) - } - } - sort.Strings(got) - if diff := cmp.Diff(tc.wantPods, got); diff != "" { - t.Errorf("getPodsForJob() returned (-want,+got):\n%s", diff) - } - if wFinalizers { - sort.Strings(gotFinalizer) - if diff := cmp.Diff(tc.wantPodsFinalizer, gotFinalizer); diff != "" { - t.Errorf("Pods with finalizers (-want,+got):\n%s", diff) - } - } - }) - } + } + sort.Strings(got) + if diff := cmp.Diff(tc.wantPods, got); diff != "" { + t.Errorf("getPodsForJob() returned (-want,+got):\n%s", diff) + } + sort.Strings(gotFinalizer) + if diff := cmp.Diff(tc.wantPodsFinalizer, gotFinalizer); diff != "" { + t.Errorf("Pods with finalizers (-want,+got):\n%s", diff) + } + }) } } @@ -4337,9 +4174,6 @@ func TestFinalizersRemovedExpectations(t *testing.T) { } job := newJob(2, 2, 6, batch.NonIndexedCompletion) - job.Annotations = map[string]string{ - batch.JobTrackingFinalizer: "", - } sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job) pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...) podInformer := sharedInformers.Core().V1().Pods().Informer() diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 84a9e85a1f7..7d6fcfd8e1d 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -687,9 +687,6 @@ func TestNonParallelJob(t *testing.T) { if err != nil { t.Fatalf("Failed to create Job: %v", err) } - if !hasJobTrackingAnnotation(jobObj) { - t.Error("apiserver created job without tracking annotation") - } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Active: 1, Ready: pointer.Int32(0), @@ -728,8 +725,7 @@ func TestNonParallelJob(t *testing.T) { func TestParallelJob(t *testing.T) { cases := map[string]struct { - trackWithFinalizers bool - enableReadyPods bool + enableReadyPods bool }{ "none": {}, "ready pods": { @@ -820,9 +816,7 @@ func TestParallelJob(t *testing.T) { } validateJobPodsStatus(ctx, t, clientSet, jobObj, want) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) - if tc.trackWithFinalizers { - validateTerminatedPodsTrackingFinalizerMetric(t, 7) - } + validateTerminatedPodsTrackingFinalizerMetric(t, 7) }) } } @@ -911,9 +905,6 @@ func TestParallelJobWithCompletions(t *testing.T) { if err != nil { t.Fatalf("Failed to create Job: %v", err) } - if !hasJobTrackingAnnotation(jobObj) { - t.Error("apiserver created job without tracking annotation") - } want := podsByStatus{Active: 54} if tc.enableReadyPods { want.Ready = pointer.Int32Ptr(0) @@ -990,9 +981,6 @@ func TestIndexedJob(t *testing.T) { if err != nil { t.Fatalf("Failed to create Job: %v", err) } - if !hasJobTrackingAnnotation(jobObj) { - t.Error("apiserver created job without tracking annotation") - } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Active: 3, Ready: pointer.Int32(0), @@ -1042,7 +1030,6 @@ func TestIndexedJob(t *testing.T) { // We expect that large jobs are more commonly used as Indexed. And they are // also faster to track, as they need less API calls. func BenchmarkLargeIndexedJob(b *testing.B) { - defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)() closeFn, restConfig, clientSet, ns := setup(b, "indexed") restConfig.QPS = 100 restConfig.Burst = 100 @@ -1144,9 +1131,6 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { if err != nil { t.Fatalf("Failed to create Job: %v", err) } - if !hasJobTrackingAnnotation(jobObj) { - t.Error("apiserver didn't add the tracking annotation") - } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Active: 2, Ready: pointer.Int32(0), @@ -1293,9 +1277,6 @@ func TestOrphanPodsFinalizersClearedOnRestart(t *testing.T) { if err != nil { t.Fatalf("Failed to create Job: %v", err) } - if !hasJobTrackingAnnotation(jobObj) { - t.Error("apiserver didn't add the tracking annotation") - } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Active: 1, Ready: pointer.Int32(0), @@ -1882,14 +1863,6 @@ func hasJobTrackingFinalizer(obj metav1.Object) bool { return false } -func hasJobTrackingAnnotation(job *batchv1.Job) bool { - if job.Annotations == nil { - return false - } - _, ok := job.Annotations[batchv1.JobTrackingFinalizer] - return ok -} - func setDuringTest(val *int, newVal int) func() { origVal := *val *val = newVal