diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 5e94893f6f1..6a44ec3c036 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -958,12 +958,37 @@ func FilterActivePods(logger klog.Logger, pods []*v1.Pod) []*v1.Pod { return result } +func FilterTerminatingPods(pods []*v1.Pod) []*v1.Pod { + var result []*v1.Pod + for _, p := range pods { + if IsPodTerminating(p) { + result = append(result, p) + } + } + return result +} + +func CountTerminatingPods(pods []*v1.Pod) int32 { + numberOfTerminatingPods := 0 + for _, p := range pods { + if IsPodTerminating(p) { + numberOfTerminatingPods += 1 + } + } + return int32(numberOfTerminatingPods) +} + func IsPodActive(p *v1.Pod) bool { return v1.PodSucceeded != p.Status.Phase && v1.PodFailed != p.Status.Phase && p.DeletionTimestamp == nil } +func IsPodTerminating(p *v1.Pod) bool { + return !podutil.IsPodTerminal(p) && + p.DeletionTimestamp != nil +} + // FilterActiveReplicaSets returns replica sets that have (or at least ought to have) pods. func FilterActiveReplicaSets(replicaSets []*apps.ReplicaSet) []*apps.ReplicaSet { activeFilter := func(rs *apps.ReplicaSet) bool { diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index 99f23df4dcb..bda74246226 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -380,6 +380,31 @@ func TestDeletePodsAllowsMissing(t *testing.T) { assert.True(t, apierrors.IsNotFound(err)) } +func TestCountTerminatingPods(t *testing.T) { + now := metav1.Now() + + // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace. + rc := newReplicationController(0) + podList := newPodList(nil, 7, v1.PodRunning, rc) + podList.Items[0].Status.Phase = v1.PodSucceeded + podList.Items[1].Status.Phase = v1.PodFailed + podList.Items[2].Status.Phase = v1.PodPending + podList.Items[2].SetDeletionTimestamp(&now) + podList.Items[3].Status.Phase = v1.PodRunning + podList.Items[3].SetDeletionTimestamp(&now) + var podPointers []*v1.Pod + for i := range podList.Items { + podPointers = append(podPointers, &podList.Items[i]) + } + + terminatingPods := CountTerminatingPods(podPointers) + + assert.Equal(t, terminatingPods, int32(2)) + + terminatingList := FilterTerminatingPods(podPointers) + assert.Equal(t, len(terminatingList), int(2)) +} + func TestActivePodFiltering(t *testing.T) { logger, _ := ktesting.NewTestContext(t) // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace. diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index 8d5bc8bad2d..87d4ca9cfce 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -243,6 +243,7 @@ func parseIndexesFromString(logger klog.Logger, indexesStr string, completions i // firstPendingIndexes returns `count` indexes less than `completions` that are // not covered by `activePods`, `succeededIndexes` or `failedIndexes`. +// In cases of PodReplacementPolicy as Failed we will include `terminatingPods` in this list. func firstPendingIndexes(jobCtx *syncJobCtx, count, completions int) []int { if count == 0 { return nil @@ -250,6 +251,10 @@ func firstPendingIndexes(jobCtx *syncJobCtx, count, completions int) []int { active := getIndexes(jobCtx.activePods) result := make([]int, 0, count) nonPending := jobCtx.succeededIndexes.withOrderedIndexes(sets.List(active)) + if onlyReplaceFailedPods(jobCtx.job) { + terminating := getIndexes(controller.FilterTerminatingPods(jobCtx.pods)) + nonPending = nonPending.withOrderedIndexes(sets.List(terminating)) + } if jobCtx.failedIndexes != nil { nonPending = nonPending.merge(*jobCtx.failedIndexes) } diff --git a/pkg/controller/job/indexed_job_utils_test.go b/pkg/controller/job/indexed_job_utils_test.go index 9d6ccc901c9..a79fa744631 100644 --- a/pkg/controller/job/indexed_job_utils_test.go +++ b/pkg/controller/job/indexed_job_utils_test.go @@ -758,6 +758,7 @@ func TestFirstPendingIndexes(t *testing.T) { activePods: hollowPodsWithIndexPhase(tc.activePods), succeededIndexes: tc.succeededIndexes, failedIndexes: tc.failedIndexes, + job: newJob(1, 1, 1, batch.IndexedCompletion), } got := firstPendingIndexes(jobCtx, tc.cnt, tc.completions) if diff := cmp.Diff(tc.want, got); diff != "" { diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index f6bce071742..8f46c84562d 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -144,6 +144,7 @@ type syncJobCtx struct { expectedRmFinalizers sets.Set[string] uncounted *uncountedTerminatedPods podsWithDelayedDeletionPerIndex map[int]*v1.Pod + terminating *int32 } // NewController creates a new Job controller that keeps the relevant pods @@ -783,11 +784,15 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if err != nil { return err } - + var terminating *int32 + if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + terminating = pointer.Int32(controller.CountTerminatingPods(pods)) + } jobCtx := &syncJobCtx{ job: &job, pods: pods, activePods: controller.FilterActivePods(logger, pods), + terminating: terminating, uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods), expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key), } @@ -919,6 +924,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !pointer.Int32Equal(ready, job.Status.Ready) job.Status.Active = active job.Status.Ready = ready + job.Status.Terminating = jobCtx.terminating + needsStatusUpdate = needsStatusUpdate || !pointer.Int32Equal(job.Status.Terminating, jobCtx.terminating) err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate) if err != nil { return fmt.Errorf("tracking status: %w", err) @@ -1453,6 +1460,17 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn return active, metrics.JobSyncActionPodsDeleted, err } + var terminating int32 = 0 + if onlyReplaceFailedPods(jobCtx.job) { + // For PodFailurePolicy specified but PodRecreationPolicy disabled + // we still need to count terminating pods for replica counts + // But we will not allow updates to status. + if jobCtx.terminating == nil { + terminating = controller.CountTerminatingPods(jobCtx.pods) + } else { + terminating = *jobCtx.terminating + } + } wantActive := int32(0) if job.Spec.Completions == nil { // Job does not specify a number of completions. Therefore, number active @@ -1475,7 +1493,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn } } - rmAtLeast := active - wantActive + rmAtLeast := active + terminating - wantActive if rmAtLeast < 0 { rmAtLeast = 0 } @@ -1495,7 +1513,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn return active, metrics.JobSyncActionPodsDeleted, err } - if active < wantActive { + if diff := wantActive - terminating - active; diff > 0 { var remainingTime time.Duration if !hasBackoffLimitPerIndex(job) { // we compute the global remaining time for pod creation when backoffLimitPerIndex is not used @@ -1505,7 +1523,6 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn jm.enqueueSyncJobWithDelay(logger, job, remainingTime) return 0, metrics.JobSyncActionPodsCreated, nil } - diff := wantActive - active if diff > int32(MaxPodCreateDeletePerSync) { diff = int32(MaxPodCreateDeletePerSync) } @@ -1797,6 +1814,9 @@ func isPodFailed(p *v1.Pod, job *batch.Job) bool { if p.Status.Phase == v1.PodFailed { return true } + if onlyReplaceFailedPods(job) { + return p.Status.Phase == v1.PodFailed + } // Count deleted Pods as failures to account for orphan Pods that // never have a chance to reach the Failed phase. return p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded @@ -1849,3 +1869,13 @@ func countReadyPods(pods []*v1.Pod) int32 { } return cnt } + +// This checks if we should apply PodRecreationPolicy. +// PodRecreationPolicy controls when we recreate pods if they are marked as terminating +// Failed means that we recreate only once the pod has terminated. +func onlyReplaceFailedPods(job *batch.Job) bool { + if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) && *job.Spec.PodReplacementPolicy == batch.Failed { + return true + } + return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil +} diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 5b1f0338265..914949cb36b 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -167,7 +167,7 @@ func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod { return pods } -func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods, readyPods int) { +func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods, terminatingPods, readyPods int) { for _, pod := range newPodList(pendingPods, v1.PodPending, job) { podIndexer.Add(pod) } @@ -190,6 +190,14 @@ func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, acti for _, pod := range newPodList(failedPods, v1.PodFailed, job) { podIndexer.Add(pod) } + terminating := newPodList(terminatingPods, v1.PodRunning, job) + for _, p := range terminating { + now := metav1.Now() + p.DeletionTimestamp = &now + } + for _, pod := range terminating { + podIndexer.Add(pod) + } } func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status []indexPhase) { @@ -234,17 +242,18 @@ func TestControllerSyncJob(t *testing.T) { testCases := map[string]struct { // job setup - parallelism int32 - completions int32 - backoffLimit int32 - deleting bool - podLimit int - completionMode batch.CompletionMode - wasSuspended bool - suspend bool - initialStatus *jobInitialStatus - backoffRecord *backoffRecord - controllerTime *time.Time + parallelism int32 + completions int32 + backoffLimit int32 + deleting bool + podLimit int + completionMode batch.CompletionMode + wasSuspended bool + suspend bool + podReplacementPolicy *batch.PodReplacementPolicy + initialStatus *jobInitialStatus + backoffRecord *backoffRecord + controllerTime *time.Time // pod setup @@ -257,6 +266,7 @@ func TestControllerSyncJob(t *testing.T) { readyPods int succeededPods int failedPods int + terminatingPods int podsWithIndexes []indexPhase fakeExpectationAtCreation int32 // negative: ExpectDeletions, positive: ExpectCreations @@ -268,6 +278,7 @@ func TestControllerSyncJob(t *testing.T) { expectedSucceeded int32 expectedCompletedIdxs string expectedFailed int32 + expectedTerminating *int32 expectedCondition *batch.JobConditionType expectedConditionStatus v1.ConditionStatus expectedConditionReason string @@ -275,8 +286,9 @@ func TestControllerSyncJob(t *testing.T) { expectedPodPatches int // features - jobReadyPodsEnabled bool - podIndexLabelDisabled bool + jobReadyPodsEnabled bool + podIndexLabelDisabled bool + jobPodReplacementPolicy bool }{ "job start": { parallelism: 2, @@ -335,6 +347,35 @@ func TestControllerSyncJob(t *testing.T) { expectedSucceeded: 1, expectedPodPatches: 1, }, + "WQ job: recreate pods when failed": { + parallelism: 1, + completions: -1, + backoffLimit: 6, + activePods: 1, + failedPods: 1, + podReplacementPolicy: podReplacementPolicy(batch.Failed), + jobPodReplacementPolicy: true, + terminatingPods: 1, + expectedTerminating: pointer.Int32(1), + expectedPodPatches: 2, + expectedDeletions: 1, + expectedFailed: 1, + }, + "WQ job: recreate pods when terminating or failed": { + parallelism: 1, + completions: -1, + backoffLimit: 6, + activePods: 1, + failedPods: 1, + podReplacementPolicy: podReplacementPolicy(batch.TerminatingOrFailed), + jobPodReplacementPolicy: true, + terminatingPods: 1, + expectedTerminating: pointer.Int32(1), + expectedActive: 1, + expectedPodPatches: 2, + expectedFailed: 2, + }, + "too few active pods and active back-off": { parallelism: 1, completions: 1, @@ -585,6 +626,33 @@ func TestControllerSyncJob(t *testing.T) { expectedActive: 2, expectedCreatedIndexes: sets.New(0, 1), }, + "indexed job with some pods deleted, podRecreationPolicy Failed": { + parallelism: 2, + completions: 5, + backoffLimit: 6, + completionMode: batch.IndexedCompletion, + expectedCreations: 1, + expectedActive: 1, + expectedCreatedIndexes: sets.New(0), + podReplacementPolicy: podReplacementPolicy(batch.Failed), + jobPodReplacementPolicy: true, + terminatingPods: 1, + expectedTerminating: pointer.Int32(1), + }, + "indexed job with some pods deleted, podRecreationPolicy TerminatingOrFailed": { + parallelism: 2, + completions: 5, + backoffLimit: 6, + completionMode: batch.IndexedCompletion, + expectedCreations: 2, + expectedActive: 2, + expectedCreatedIndexes: sets.New(0, 1), + podReplacementPolicy: podReplacementPolicy(batch.TerminatingOrFailed), + jobPodReplacementPolicy: true, + terminatingPods: 1, + expectedTerminating: pointer.Int32(1), + expectedPodPatches: 1, + }, "indexed job completed": { parallelism: 2, completions: 3, @@ -800,7 +868,7 @@ func TestControllerSyncJob(t *testing.T) { logger, _ := ktesting.NewTestContext(t) defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodIndexLabel, !tc.podIndexLabelDisabled)() - + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.jobPodReplacementPolicy)() // job manager setup clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) @@ -820,6 +888,9 @@ func TestControllerSyncJob(t *testing.T) { // job & pods setup job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode) job.Spec.Suspend = pointer.Bool(tc.suspend) + if tc.jobPodReplacementPolicy { + job.Spec.PodReplacementPolicy = tc.podReplacementPolicy + } if tc.initialStatus != nil { startTime := metav1.Now() job.Status.StartTime = &startTime @@ -855,7 +926,7 @@ func TestControllerSyncJob(t *testing.T) { } sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() - setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods, tc.readyPods) + setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods, tc.terminatingPods, tc.readyPods) setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes) actual := job @@ -937,6 +1008,9 @@ func TestControllerSyncJob(t *testing.T) { if actual.Status.Failed != tc.expectedFailed { t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) } + if diff := cmp.Diff(tc.expectedTerminating, actual.Status.Terminating); diff != "" { + t.Errorf("Unexpected number of terminating pods (-want,+got): %s", diff) + } if actual.Status.StartTime != nil && tc.suspend { t.Error("Unexpected .status.startTime not nil when suspend is true") } @@ -1905,7 +1979,7 @@ func TestSyncJobPastDeadline(t *testing.T) { job.Status.StartTime = &start sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() - setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0) + setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0, 0) // run err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) @@ -2175,12 +2249,14 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { testCases := map[string]struct { enableJobPodFailurePolicy bool enablePodDisruptionConditions bool + enableJobPodReplacementPolicy bool job batch.Job pods []v1.Pod wantConditions *[]batch.JobCondition wantStatusFailed int32 wantStatusActive int32 wantStatusSucceeded int32 + wantStatusTerminating *int32 }{ "default handling for pod failure if the container matching the exit codes does not match the containerName restriction": { enableJobPodFailurePolicy: true, @@ -3149,15 +3225,6 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { }, }, }, - wantConditions: &[]batch.JobCondition{ - { - Type: batch.JobFailed, - Status: v1.ConditionTrue, - Reason: "BackoffLimitExceeded", - Message: "Job has reached the specified backoff limit", - }, - }, - wantStatusFailed: 1, }, "terminating Pod not considered failed when PodDisruptionConditions is enabled": { enableJobPodFailurePolicy: true, @@ -3195,13 +3262,17 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { }, }, }, - wantStatusActive: 1, // This is a replacement Pod: the terminating Pod is neither active nor failed. }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy)() + + if tc.job.Spec.PodReplacementPolicy == nil { + tc.job.Spec.PodReplacementPolicy = podReplacementPolicy(batch.Failed) + } clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} @@ -3254,6 +3325,9 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { if actual.Status.Failed != tc.wantStatusFailed { t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.wantStatusFailed, actual.Status.Failed) } + if pointer.Int32Deref(actual.Status.Terminating, 0) != pointer.Int32Deref(tc.wantStatusTerminating, 0) { + t.Errorf("unexpected number of terminating pods. Expected %d, saw %d\n", pointer.Int32Deref(tc.wantStatusTerminating, 0), pointer.Int32Deref(actual.Status.Terminating, 0)) + } }) } } @@ -5135,6 +5209,10 @@ func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec, podIndexLabel } } +func podReplacementPolicy(m batch.PodReplacementPolicy) *batch.PodReplacementPolicy { + return &m +} + func verifyEmptyQueueAndAwaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) { t.Helper() verifyEmptyQueue(ctx, t, jm) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index c2607a296bd..49338637aeb 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1667,6 +1667,159 @@ func TestIndexedJob(t *testing.T) { validateTerminatedPodsTrackingFinalizerMetric(t, 5) } +func TestJobPodReplacementPolicy(t *testing.T) { + const podCount int32 = 2 + indexedCompletion := batchv1.IndexedCompletion + nonIndexedCompletion := batchv1.NonIndexedCompletion + var podReplacementPolicy = func(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy { + return &obj + } + jobSpecIndexedDefault := &batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(podCount), + Completions: pointer.Int32Ptr(podCount), + CompletionMode: &indexedCompletion, + } + cases := map[string]struct { + podReplacementPolicyEnabled bool + wantTerminating *int32 + wantFailed int + wantActive int + jobSpec *batchv1.JobSpec + }{ + "feature flag off, delete pods and verify no terminating status": { + jobSpec: jobSpecIndexedDefault, + wantActive: int(podCount), + wantFailed: int(podCount), + }, + "feature flag true, delete pods and verify terminating status": { + podReplacementPolicyEnabled: true, + jobSpec: jobSpecIndexedDefault, + wantTerminating: pointer.Int32(podCount), + wantFailed: int(podCount), + }, + "feature flag true, delete pods, verify terminating status and recreate upon terminating": { + podReplacementPolicyEnabled: true, + jobSpec: &batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(podCount), + Completions: pointer.Int32Ptr(podCount), + CompletionMode: &indexedCompletion, + PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed), + }, + wantTerminating: pointer.Int32(podCount), + wantFailed: int(podCount), + }, + "feature flag true, delete pods, verify terminating status and recreate once failed": { + podReplacementPolicyEnabled: true, + jobSpec: &batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(podCount), + Completions: pointer.Int32Ptr(podCount), + CompletionMode: &nonIndexedCompletion, + PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), + }, + wantTerminating: pointer.Int32(podCount), + }, + "feature flag true with NonIndexedJob, delete pods, verify terminating status and recreate once failed": { + podReplacementPolicyEnabled: true, + jobSpec: &batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(podCount), + Completions: pointer.Int32Ptr(podCount), + CompletionMode: &nonIndexedCompletion, + PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), + }, + wantTerminating: pointer.Int32(podCount), + }, + "feature flag false, podFailurePolicy enabled, delete pods, verify terminating status and recreate once failed": { + podReplacementPolicyEnabled: false, + jobSpec: &batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(podCount), + Completions: pointer.Int32Ptr(podCount), + CompletionMode: &nonIndexedCompletion, + PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), + PodFailurePolicy: &batchv1.PodFailurePolicy{ + Rules: []batchv1.PodFailurePolicyRule{ + { + Action: batchv1.PodFailurePolicyActionFailJob, + OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{ + Operator: batchv1.PodFailurePolicyOnExitCodesOpIn, + Values: []int32{5}, + }, + }, + }, + }, + }, + wantActive: int(podCount), + }, + } + for name, tc := range cases { + tc := tc + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.podReplacementPolicyEnabled)() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.jobSpec.PodFailurePolicy != nil)() + + closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + defer cancel() + resetMetrics() + + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: *tc.jobSpec, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace) + + // Wait for pods to start up. + err = wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if job.Status.Active == int32(podCount) { + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("Error waiting for Job pods to become active: %v", err) + } + pods, errList := clientSet.CoreV1().Pods(ns.Namespace).List(ctx, metav1.ListOptions{}) + if errList != nil { + t.Fatalf("Failed to list pods: %v", errList) + } + updatePod(t, clientSet, pods.Items, func(pod *v1.Pod) { + pod.Finalizers = append(pod.Finalizers, "fake.example.com/blockDeletion") + }) + err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx, + metav1.DeleteOptions{}, + metav1.ListOptions{ + Limit: 1000, + }) + if err != nil { + t.Fatalf("Failed to cleanup Pods: %v", err) + } + + podsDelete, errList2 := clientSet.CoreV1().Pods(ns.Namespace).List(ctx, metav1.ListOptions{}) + if errList != nil { + t.Fatalf("Failed to list pods: %v", errList2) + } + for _, val := range podsDelete.Items { + if val.DeletionTimestamp == nil { + t.Fatalf("Deletion not registered.") + } + } + + validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{ + Terminating: tc.wantTerminating, + Failed: tc.wantFailed, + Active: tc.wantActive, + Ready: pointer.Int32(0), + }) + }) + } +} + func TestElasticIndexedJob(t *testing.T) { const initialCompletions int32 = 3 type jobUpdate struct { @@ -2360,13 +2513,14 @@ func TestNodeSelectorUpdate(t *testing.T) { } type podsByStatus struct { - Active int - Ready *int32 - Failed int - Succeeded int + Active int + Ready *int32 + Failed int + Succeeded int + Terminating *int32 } -func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) { +func validateJobsPodsStatusOnly(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) { t.Helper() var actualCounts podsByStatus if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) { @@ -2375,16 +2529,21 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse t.Fatalf("Failed to get updated Job: %v", err) } actualCounts = podsByStatus{ - Active: int(updatedJob.Status.Active), - Ready: updatedJob.Status.Ready, - Succeeded: int(updatedJob.Status.Succeeded), - Failed: int(updatedJob.Status.Failed), + Active: int(updatedJob.Status.Active), + Ready: updatedJob.Status.Ready, + Succeeded: int(updatedJob.Status.Succeeded), + Failed: int(updatedJob.Status.Failed), + Terminating: updatedJob.Status.Terminating, } return cmp.Equal(actualCounts, desired), nil }); err != nil { diff := cmp.Diff(desired, actualCounts) t.Errorf("Waiting for Job Status: %v\nPods (-want,+got):\n%s", err, diff) } +} +func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) { + t.Helper() + validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired) var active []*v1.Pod if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) { pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) @@ -2627,6 +2786,22 @@ func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updat return int(updated), nil } +func updatePod(t *testing.T, clientSet clientset.Interface, pods []v1.Pod, updateFunc func(*v1.Pod)) { + for _, val := range pods { + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + newPod, err := clientSet.CoreV1().Pods(val.Namespace).Get(context.TODO(), val.Name, metav1.GetOptions{}) + if err != nil { + return err + } + updateFunc(newPod) + _, err = clientSet.CoreV1().Pods(val.Namespace).Update(context.TODO(), newPod, metav1.UpdateOptions{}) + return err + }); err != nil { + t.Fatalf("Failed to update pod %s: %v", val.Name, err) + } + } +} + func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error { pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) if err != nil {