diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index cee54139d45..9f5ceac0185 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -400,10 +400,10 @@ func TestControllerSyncJob(t *testing.T) { backoffLimit: 6, activePods: 2, failedPods: 0, - terminatingPods: 5, + terminatingPods: 4, podReplacementPolicy: podReplacementPolicy(batch.Failed), jobPodReplacementPolicy: true, - expectedTerminating: ptr.To[int32](6), + expectedTerminating: ptr.To[int32](5), expectedReady: ptr.To[int32](0), expectedActive: 1, expectedDeletions: 1, @@ -497,6 +497,18 @@ func TestControllerSyncJob(t *testing.T) { expectedPodPatches: 1, expectedReady: ptr.To[int32](0), }, + "too many active pods; PodReplacementPolicy enabled": { + parallelism: 2, + completions: 5, + backoffLimit: 6, + activePods: 3, + jobPodReplacementPolicy: true, + expectedDeletions: 1, + expectedActive: 2, + expectedPodPatches: 1, + expectedReady: ptr.To[int32](0), + expectedTerminating: ptr.To[int32](1), + }, "too many active pods, with controller error": { parallelism: 2, completions: 5, @@ -745,6 +757,48 @@ func TestControllerSyncJob(t *testing.T) { expectedPodPatches: 4, expectedReady: ptr.To[int32](0), }, + "indexed job failed": { + parallelism: 2, + completions: 3, + backoffLimit: 0, + completionMode: batch.IndexedCompletion, + podsWithIndexes: []indexPhase{ + {"0", v1.PodSucceeded}, + {"1", v1.PodFailed}, + {"2", v1.PodRunning}, + }, + expectedSucceeded: 1, + expectedFailed: 2, + expectedCompletedIdxs: "0", + expectedCondition: &jobConditionFailed, + expectedConditionStatus: v1.ConditionTrue, + expectedConditionReason: "BackoffLimitExceeded", + expectedPodPatches: 3, + expectedReady: ptr.To[int32](0), + expectedDeletions: 1, + }, + "count terminating pods when indexed job fails and PodReplacementPolicy enabled": { + parallelism: 2, + completions: 3, + backoffLimit: 0, + completionMode: batch.IndexedCompletion, + podsWithIndexes: []indexPhase{ + {"0", v1.PodSucceeded}, + {"1", v1.PodFailed}, + {"2", v1.PodRunning}, + }, + jobPodReplacementPolicy: true, + expectedSucceeded: 1, + expectedFailed: 2, + expectedCompletedIdxs: "0", + expectedCondition: &jobConditionFailed, + expectedConditionStatus: v1.ConditionTrue, + expectedConditionReason: "BackoffLimitExceeded", + expectedPodPatches: 3, + expectedReady: ptr.To[int32](0), + expectedDeletions: 1, + expectedTerminating: ptr.To[int32](1), + }, "indexed job repeated completed index": { parallelism: 2, completions: 3, @@ -888,6 +942,25 @@ func TestControllerSyncJob(t *testing.T) { expectedPodPatches: 2, expectedReady: ptr.To[int32](0), }, + "suspending a job with satisfied expectations; PodReplacementPolicy enabled": { + // Suspended Job should delete active pods when expectations are + // satisfied. + suspend: true, + parallelism: 2, + activePods: 2, // parallelism == active, expectations satisfied + completions: 4, + backoffLimit: 6, + jobPodReplacementPolicy: true, + expectedCreations: 0, + expectedDeletions: 2, + expectedActive: 0, + expectedCondition: &jobConditionSuspended, + expectedConditionStatus: v1.ConditionTrue, + expectedConditionReason: "JobSuspended", + expectedPodPatches: 2, + expectedReady: ptr.To[int32](0), + expectedTerminating: ptr.To[int32](2), + }, "suspending a job with unsatisfied expectations": { // Unlike the previous test, we expect the controller to NOT suspend the // Job in the syncJob call because the controller will wait for @@ -904,6 +977,20 @@ func TestControllerSyncJob(t *testing.T) { expectedActive: 3, expectedReady: ptr.To[int32](0), }, + "suspending a job with unsatisfied expectations; PodReplacementPolicy enabled": { + suspend: true, + parallelism: 2, + activePods: 3, // active > parallelism, expectations unsatisfied + fakeExpectationAtCreation: -1, // the controller is expecting a deletion + completions: 4, + backoffLimit: 6, + jobPodReplacementPolicy: true, + expectedCreations: 0, + expectedDeletions: 0, + expectedActive: 3, + expectedReady: ptr.To[int32](0), + expectedTerminating: ptr.To[int32](0), + }, "resuming a suspended job": { wasSuspended: true, suspend: false, @@ -935,6 +1022,21 @@ func TestControllerSyncJob(t *testing.T) { expectedPodPatches: 2, expectedReady: ptr.To[int32](0), }, + "suspending a deleted job; PodReplacementPolicy enabled": { + suspend: true, + deleting: true, + parallelism: 2, + activePods: 2, // parallelism == active, expectations satisfied + completions: 4, + backoffLimit: 6, + jobPodReplacementPolicy: true, + expectedCreations: 0, + expectedDeletions: 0, + expectedActive: 2, + expectedPodPatches: 2, + expectedReady: ptr.To[int32](0), + expectedTerminating: ptr.To[int32](0), + }, "indexed job with podIndexLabel feature disabled": { parallelism: 2, completions: 5, @@ -3604,15 +3706,17 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { } testCases := map[string]struct { - enableJobFailurePolicy bool - enableBackoffLimitPerIndex bool - enableJobSuccessPolicy bool - job batch.Job - pods []v1.Pod - wantStatus batch.JobStatus + enableJobFailurePolicy bool + enableBackoffLimitPerIndex bool + enableJobSuccessPolicy bool + enableJobPodReplacementPolicy bool + job batch.Job + pods []v1.Pod + wantStatus batch.JobStatus }{ - "job with successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and some indexes fail": { - enableJobSuccessPolicy: true, + "job with successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet condition if job meets to successPolicy and some indexes fail": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3659,9 +3763,58 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with podFailurePolicy and successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet to podFailurePolicy": { - enableJobSuccessPolicy: true, - enableJobFailurePolicy: true, + "job with successPolicy; jobPodReplacementPolicy feature disabled; job has SuccessCriteriaMet condition if job meets to successPolicy and some indexes fail": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: false, + job: batch.Job{ + TypeMeta: validTypeMeta, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + CompletionMode: completionModePtr(batch.IndexedCompletion), + Parallelism: ptr.To[int32](3), + Completions: ptr.To[int32](3), + BackoffLimit: ptr.To[int32](math.MaxInt32), + SuccessPolicy: &batch.SuccessPolicy{ + Rules: []batch.SuccessPolicyRule{{ + SucceededIndexes: ptr.To("0,1"), + SucceededCount: ptr.To[int32](1), + }}, + }, + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, + *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod, + *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, + *buildPod().uid("c").index("2").phase(v1.PodRunning).trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 1, + Succeeded: 1, + CompletedIndexes: "1", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + Reason: batch.JobReasonSuccessPolicy, + Message: "Matched rules at index 0", + }, + { + Type: batch.JobComplete, + Status: v1.ConditionTrue, + Reason: batch.JobReasonSuccessPolicy, + Message: "Matched rules at index 0", + }, + }, + }, + }, + "job with podFailurePolicy and successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet to podFailurePolicy": { + enableJobSuccessPolicy: true, + enableJobFailurePolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3716,9 +3869,67 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with backoffLimitPerIndex and successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet backoffLimitPerIndex": { - enableJobSuccessPolicy: true, - enableBackoffLimitPerIndex: true, + "job with podFailurePolicy and successPolicy; jobPodReplacementPolicy feature disabled; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet to podFailurePolicy": { + enableJobSuccessPolicy: true, + enableJobFailurePolicy: true, + enableJobPodReplacementPolicy: false, + job: batch.Job{ + TypeMeta: validTypeMeta, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + CompletionMode: completionModePtr(batch.IndexedCompletion), + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + BackoffLimit: ptr.To[int32](math.MaxInt32), + SuccessPolicy: &batch.SuccessPolicy{ + Rules: []batch.SuccessPolicyRule{{ + SucceededIndexes: ptr.To("0,1"), + SucceededCount: ptr.To[int32](1), + }}, + }, + PodFailurePolicy: &batch.PodFailurePolicy{ + Rules: []batch.PodFailurePolicyRule{{ + Action: batch.PodFailurePolicyActionFailJob, + OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + }}, + }}, + }, + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, + *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod, + *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 1, + Succeeded: 1, + CompletedIndexes: "1", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + Reason: batch.JobReasonSuccessPolicy, + Message: "Matched rules at index 0", + }, + { + Type: batch.JobComplete, + Status: v1.ConditionTrue, + Reason: batch.JobReasonSuccessPolicy, + Message: "Matched rules at index 0", + }, + }, + }, + }, + "job with backoffLimitPerIndex and successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet backoffLimitPerIndex": { + enableJobSuccessPolicy: true, + enableBackoffLimitPerIndex: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3766,8 +3977,9 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy; job has both Complete and SuccessCriteriaMet condition when job meets to successPolicy and all pods have been already removed": { - enableJobSuccessPolicy: true, + "job with successPolicy; jobPodReplacementPolicy feature enabled; job has both Complete and SuccessCriteriaMet condition when job meets to successPolicy and all pods have been already removed": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3828,8 +4040,9 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { // So, we need to revisit here before we graduate the JobSuccessPolicy to beta. // TODO(#123775): A Job might finish with ready!=0 // REF: https://github.com/kubernetes/kubernetes/issues/123775 - "job with successPolicy; job has SuccessCriteriaMet and Complete condition when job meets to successPolicy and some pods still are running": { - enableJobSuccessPolicy: true, + "job with successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet and Complete condition when job meets to successPolicy and some pods still are running": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3887,9 +4100,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and podFailurePolicy; job has a failed condition when job meets to both successPolicy and podFailurePolicy": { - enableJobSuccessPolicy: true, - enableJobFailurePolicy: true, + "job with successPolicy and podFailurePolicy; jobPodReplacementPolicy feature enabled; job has a failed condition when job meets to both successPolicy and podFailurePolicy": { + enableJobSuccessPolicy: true, + enableJobFailurePolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3950,9 +4164,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and backoffLimitPerIndex; job has a failed condition when job meets to both successPolicy and backoffLimitPerIndex": { - enableJobSuccessPolicy: true, - enableBackoffLimitPerIndex: true, + "job with successPolicy and backoffLimitPerIndex; jobPodReplacementPolicy feature enabled; job has a failed condition when job meets to both successPolicy and backoffLimitPerIndex": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, + enableBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -3993,8 +4208,9 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and backoffLimit; job has a failed condition when job meets to both successPolicy and backoffLimit": { - enableJobSuccessPolicy: true, + "job with successPolicy and backoffLimit; jobPodReplacementPolicy feature enabled; job has a failed condition when job meets to both successPolicy and backoffLimit": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -4034,9 +4250,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and podFailurePolicy; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets podFailurePolicy": { - enableJobSuccessPolicy: true, - enableJobFailurePolicy: true, + "job with successPolicy and podFailurePolicy; jobPodReplacementPolicy feature enabled; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets podFailurePolicy": { + enableJobSuccessPolicy: true, + enableJobFailurePolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -4110,9 +4327,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and backoffLimitPerIndex; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meet backoffLimitPerIndex": { - enableJobSuccessPolicy: true, - enableBackoffLimitPerIndex: true, + "job with successPolicy and backoffLimitPerIndex; jobPodReplacementPolicy feature enabled; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meet backoffLimitPerIndex": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, + enableBackoffLimitPerIndex: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -4173,8 +4391,9 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and backoffLimit: job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets backoffLimit": { - enableJobSuccessPolicy: true, + "job with successPolicy and backoffLimit; jobPodReplacementPolicy feature enabled; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets backoffLimit": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -4234,9 +4453,10 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - "job with successPolicy and podFailureTarget; job with FailureTarget has never been transitioned to SuccessCriteriaMet even if job meets successPolicy": { - enableJobSuccessPolicy: true, - enableJobFailurePolicy: true, + "job with successPolicy and podFailureTarget; jobPodReplacementPolicy feature enabled; job with FailureTarget has never been transitioned to SuccessCriteriaMet even if job meets successPolicy": { + enableJobSuccessPolicy: true, + enableJobFailurePolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: validTypeMeta, ObjectMeta: validObjectMeta, @@ -4317,6 +4537,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobFailurePolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(now)