diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 251aef081ad..b0fb7ba1c59 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -899,6 +899,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { jobCtx.finishedCondition = nil } active -= deleted + if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + *jobCtx.terminating += deleted + } manageJobErr = err } else { manageJobCalled := false @@ -1504,6 +1507,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete)) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed + if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + *jobCtx.terminating += removed + } return active, metrics.JobSyncActionPodsDeleted, err } @@ -1553,6 +1559,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed + if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + *jobCtx.terminating += removed + } // While it is possible for a Job to require both pod creations and // deletions at the same time (e.g. indexed Jobs with repeated indexes), we // restrict ourselves to either just pod deletion or pod creation in any diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index f8be81228e5..cee54139d45 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -400,10 +400,10 @@ func TestControllerSyncJob(t *testing.T) { backoffLimit: 6, activePods: 2, failedPods: 0, - terminatingPods: 4, + terminatingPods: 5, podReplacementPolicy: podReplacementPolicy(batch.Failed), jobPodReplacementPolicy: true, - expectedTerminating: ptr.To[int32](4), + expectedTerminating: ptr.To[int32](6), expectedReady: ptr.To[int32](0), expectedActive: 1, expectedDeletions: 1, @@ -3640,7 +3640,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](2), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ @@ -3697,7 +3697,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](1), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ @@ -3746,7 +3746,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](1), CompletedIndexes: "1", FailedIndexes: ptr.To(""), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, @@ -3868,7 +3868,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { wantStatus: batch.JobStatus{ Failed: 1, Succeeded: 1, - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](2), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ @@ -3931,7 +3931,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { wantStatus: batch.JobStatus{ Failed: 2, Succeeded: 1, - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](1), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ @@ -4761,7 +4761,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { wantStatus: batch.JobStatus{ Failed: 3, Succeeded: 1, - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](1), FailedIndexes: ptr.To("0,2"), CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 25a08d5f37b..e83a1ccb412 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -514,6 +514,7 @@ func TestSuccessPolicy(t *testing.T) { wantActiveIndexes sets.Set[int] wantCompletedIndexes string wantFailedIndexes *string + wantTerminating *int32 } podTemplateSpec := v1.PodTemplateSpec{ @@ -561,6 +562,7 @@ func TestSuccessPolicy(t *testing.T) { wantFailed: 0, wantSucceeded: 1, wantCompletedIndexes: "0", + wantTerminating: ptr.To(int32(0)), }, }, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete}, @@ -595,6 +597,7 @@ func TestSuccessPolicy(t *testing.T) { wantFailed: 0, wantSucceeded: 1, wantCompletedIndexes: "0", + wantTerminating: ptr.To(int32(0)), }, }, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobComplete}, @@ -630,6 +633,7 @@ func TestSuccessPolicy(t *testing.T) { wantActiveIndexes: sets.New(0, 1), wantFailed: 0, wantSucceeded: 0, + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -640,6 +644,7 @@ func TestSuccessPolicy(t *testing.T) { wantFailed: 0, wantSucceeded: 1, wantCompletedIndexes: "1", + wantTerminating: ptr.To(int32(1)), }, }, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete}, @@ -675,6 +680,7 @@ func TestSuccessPolicy(t *testing.T) { wantActiveIndexes: sets.New(0, 1), wantFailed: 0, wantSucceeded: 0, + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -685,6 +691,7 @@ func TestSuccessPolicy(t *testing.T) { wantFailed: 0, wantSucceeded: 1, wantCompletedIndexes: "1", + wantTerminating: ptr.To(int32(1)), }, }, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete}, @@ -723,6 +730,7 @@ func TestSuccessPolicy(t *testing.T) { wantFailed: 1, wantFailedIndexes: ptr.To("0"), wantSucceeded: 0, + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -734,6 +742,7 @@ func TestSuccessPolicy(t *testing.T) { wantSucceeded: 1, wantFailedIndexes: ptr.To("0"), wantCompletedIndexes: "1", + wantTerminating: ptr.To(int32(0)), }, }, wantConditionTypes: []batchv1.JobConditionType{batchv1.JobFailed}, @@ -774,7 +783,7 @@ func TestSuccessPolicy(t *testing.T) { Succeeded: podTermination.wantSucceeded, Failed: podTermination.wantFailed, Ready: ptr.To[int32](0), - Terminating: ptr.To[int32](0), + Terminating: podTermination.wantTerminating, }) validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes) } @@ -861,7 +870,7 @@ func TestSuccessPolicy_ReEnabling(t *testing.T) { Active: 0, Succeeded: 3, Ready: ptr.To[int32](0), - Terminating: ptr.To[int32](0), + Terminating: ptr.To[int32](2), }) validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New[int](), "0-2", nil) @@ -1168,6 +1177,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantCompletedIndexes string wantFailedIndexes *string wantReplacementPodFailureCount *int + wantTerminating *int32 } podTemplateSpec := v1.PodTemplateSpec{ @@ -1208,6 +1218,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantActiveIndexes: sets.New(0, 1), wantFailedIndexes: ptr.To(""), wantReplacementPodFailureCount: ptr.To(1), + wantTerminating: ptr.To(int32(0)), }, }, wantJobConditionType: batchv1.JobComplete, @@ -1238,6 +1249,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantActiveIndexes: sets.New(0, 1), wantFailedIndexes: ptr.To(""), wantReplacementPodFailureCount: ptr.To(1), + wantTerminating: ptr.To(int32(0)), }, { status: v1.PodStatus{ @@ -1248,6 +1260,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantActiveIndexes: sets.New(0, 1), wantFailedIndexes: ptr.To(""), wantReplacementPodFailureCount: ptr.To(2), + wantTerminating: ptr.To(int32(0)), }, { status: v1.PodStatus{ @@ -1257,6 +1270,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantFailed: 3, wantActiveIndexes: sets.New(1), wantFailedIndexes: ptr.To("0"), + wantTerminating: ptr.To(int32(0)), }, }, wantJobConditionType: batchv1.JobFailed, @@ -1292,6 +1306,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantFailed: 1, wantActiveIndexes: sets.New(0, 1, 2), wantFailedIndexes: ptr.To(""), + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -1302,6 +1317,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantFailed: 2, wantActiveIndexes: sets.New(0, 1, 2), wantFailedIndexes: ptr.To(""), + wantTerminating: ptr.To(int32(0)), }, { index: 2, @@ -1310,6 +1326,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { }, wantFailed: 5, wantFailedIndexes: ptr.To(""), + wantTerminating: ptr.To(int32(2)), }, }, wantJobConditionType: batchv1.JobFailed, @@ -1344,6 +1361,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantFailed: 1, wantActiveIndexes: sets.New(1), wantFailedIndexes: ptr.To("0"), + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -1354,6 +1372,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantSucceeded: 1, wantFailedIndexes: ptr.To("0"), wantCompletedIndexes: "1", + wantTerminating: ptr.To(int32(0)), }, }, wantJobConditionType: batchv1.JobFailed, @@ -1389,6 +1408,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantFailed: 1, wantActiveIndexes: sets.New(1, 2), wantFailedIndexes: ptr.To("0"), + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -1398,6 +1418,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantActive: 0, wantFailed: 3, wantFailedIndexes: ptr.To("0,1"), + wantTerminating: ptr.To(int32(1)), }, }, wantJobConditionType: batchv1.JobFailed, @@ -1457,6 +1478,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { wantFailed: 1, wantActiveIndexes: sets.New(1), wantFailedIndexes: ptr.To("0"), + wantTerminating: ptr.To(int32(0)), }, { index: 1, @@ -1471,6 +1493,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { }, wantFailed: 2, wantFailedIndexes: ptr.To("0,1"), + wantTerminating: ptr.To(int32(0)), }, }, wantJobConditionType: batchv1.JobFailed, @@ -1517,7 +1540,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { Succeeded: podTermination.wantSucceeded, Failed: podTermination.wantFailed, Ready: ptr.To[int32](0), - Terminating: ptr.To[int32](0), + Terminating: podTermination.wantTerminating, }) validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes) if podTermination.wantReplacementPodFailureCount != nil { @@ -2730,6 +2753,7 @@ func TestElasticIndexedJob(t *testing.T) { wantFailed int wantRemainingIndexes sets.Set[int] wantActivePods int + wantTerminating *int32 } cases := map[string]struct { featureGate bool @@ -2739,7 +2763,8 @@ func TestElasticIndexedJob(t *testing.T) { "feature flag off, mutation not allowed": { jobUpdates: []jobUpdate{ { - completions: ptr.To[int32](4), + completions: ptr.To[int32](4), + wantTerminating: ptr.To[int32](0), }, }, wantErr: apierrors.NewInvalid( @@ -2756,6 +2781,7 @@ func TestElasticIndexedJob(t *testing.T) { completions: ptr.To[int32](4), succeedIndexes: []int{0, 1, 2, 3}, wantSucceededIndexes: "0-3", + wantTerminating: ptr.To[int32](0), }, }, }, @@ -2770,6 +2796,7 @@ func TestElasticIndexedJob(t *testing.T) { wantFailed: 1, wantRemainingIndexes: sets.New(0, 2), wantActivePods: 2, + wantTerminating: ptr.To[int32](0), }, // Scale down completions 3->1, verify prev failure out of range still counts // but succeeded out of range does not. @@ -2778,6 +2805,7 @@ func TestElasticIndexedJob(t *testing.T) { succeedIndexes: []int{0}, wantSucceededIndexes: "0", wantFailed: 1, + wantTerminating: ptr.To[int32](0), }, }, }, @@ -2790,18 +2818,21 @@ func TestElasticIndexedJob(t *testing.T) { wantSucceededIndexes: "2", wantRemainingIndexes: sets.New(0, 1), wantActivePods: 2, + wantTerminating: ptr.To[int32](0), }, // Scale completions down 3->2 to exclude previously succeeded index. { completions: ptr.To[int32](2), wantRemainingIndexes: sets.New(0, 1), wantActivePods: 2, + wantTerminating: ptr.To[int32](0), }, // Scale completions back up to include previously succeeded index that was temporarily out of range. { completions: ptr.To[int32](3), succeedIndexes: []int{0, 1, 2}, wantSucceededIndexes: "0-2", + wantTerminating: ptr.To[int32](0), }, }, }, @@ -2809,7 +2840,8 @@ func TestElasticIndexedJob(t *testing.T) { featureGate: true, jobUpdates: []jobUpdate{ { - completions: ptr.To[int32](0), + completions: ptr.To[int32](0), + wantTerminating: ptr.To[int32](3), }, }, }, @@ -2887,7 +2919,7 @@ func TestElasticIndexedJob(t *testing.T) { Succeeded: len(update.succeedIndexes), Failed: update.wantFailed, Ready: ptr.To[int32](0), - Terminating: ptr.To[int32](0), + Terminating: update.wantTerminating, }) validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes, nil) }