From 594490fd77b838f37dff292f55b4cb10d0f57ca3 Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Sun, 14 Jul 2024 08:25:31 +0900 Subject: [PATCH 1/4] Job: Add the CompletionsReached reason to the SuccessCriteriaMet condition Signed-off-by: Yuki Iwai --- pkg/controller/job/job_controller.go | 7 +- pkg/controller/job/job_controller_test.go | 39 +++++++++ staging/src/k8s.io/api/batch/v1/types.go | 5 ++ test/e2e/apps/job.go | 2 +- test/integration/job/job_test.go | 96 ++++++++++++++++++++++- 5 files changed, 143 insertions(+), 6 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 9f250c9a344..2cd1705debb 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -988,7 +988,12 @@ func (jm *Controller) newSuccessCondition() *batch.JobCondition { if delayTerminalCondition() { cType = batch.JobSuccessCriteriaMet } - return newCondition(cType, v1.ConditionTrue, "", "", jm.clock.Now()) + var reason, message string + if feature.DefaultFeatureGate.Enabled(features.JobSuccessPolicy) { + reason = batch.JobReasonCompletionsReached + message = "Reached expected number of succeeded pods" + } + return newCondition(cType, v1.ConditionTrue, reason, message, jm.clock.Now()) } func delayTerminalCondition() bool { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 61ad09c16c4..b79226a9c39 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -4991,6 +4991,45 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, + "job without successPolicy; jobSuccessPolicy is enabled; job got SuccessCriteriaMet and Completion with CompletionsReached reason conditions": { + enableJobSuccessPolicy: true, + enableJobManagedBy: true, + job: batch.Job{ + TypeMeta: validTypeMeta, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + CompletionMode: ptr.To(batch.IndexedCompletion), + Completions: ptr.To[int32](1), + Parallelism: ptr.To[int32](1), + BackoffLimit: ptr.To[int32](math.MaxInt32), + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a1").index("0").phase(v1.PodSucceeded).trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 0, + Succeeded: 1, + CompletedIndexes: "0", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + Reason: batch.JobReasonCompletionsReached, + Message: "Reached expected number of succeeded pods", + }, + { + Type: batch.JobComplete, + Status: v1.ConditionTrue, + Reason: batch.JobReasonCompletionsReached, + Message: "Reached expected number of succeeded pods", + }, + }, + }, + }, "when the JobSuccessPolicy is disabled, the Job never got SuccessCriteriaMet condition even if the Job has the successPolicy field": { job: batch.Job{ TypeMeta: validTypeMeta, diff --git a/staging/src/k8s.io/api/batch/v1/types.go b/staging/src/k8s.io/api/batch/v1/types.go index 82a5c8fdd0c..d91435b84d2 100644 --- a/staging/src/k8s.io/api/batch/v1/types.go +++ b/staging/src/k8s.io/api/batch/v1/types.go @@ -651,6 +651,11 @@ const ( // https://kep.k8s.io/3998 // This is currently an alpha field. JobReasonSuccessPolicy string = "SuccessPolicy" + // JobReasonCompletionsReached reason indicates a SuccessCriteriaMet condition is added due to + // a number of succeeded Job pods met completions. + // - https://kep.k8s.io/3998 + // This is currently a beta field. + JobReasonCompletionsReached string = "CompletionsReached" ) // JobCondition describes current state of a job. diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index db0449e69e6..5dc334b4ebd 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -710,7 +710,7 @@ done`} framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) ginkgo.By("Awaiting for the job to have the interim success condition") - err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, "") + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, batchv1.JobReasonCompletionsReached) framework.ExpectNoError(err, "failed to ensure job has the interim success condition: %s", f.Namespace.Name) ginkgo.By("Ensuring job reaches completions") diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 0420a255737..0e3a17b7466 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -511,10 +511,10 @@ func TestSuccessPolicy(t *testing.T) { testCases := map[string]struct { enableJobSuccessPolicy bool enableBackoffLimitPerIndex bool - job batchv1.Job - podTerminations []podTerminationWithExpectations - wantConditionTypes []batchv1.JobConditionType - wantJobFinishedNumMetric []metricLabelsWithValue + job batchv1.Job + podTerminations []podTerminationWithExpectations + wantConditionTypes []batchv1.JobConditionType + wantJobFinishedNumMetric []metricLabelsWithValue }{ "all indexes succeeded; JobSuccessPolicy is enabled": { enableJobSuccessPolicy: true, @@ -1185,6 +1185,7 @@ func TestDelayTerminalPhaseCondition(t *testing.T) { testCases := map[string]struct { enableJobManagedBy bool enableJobPodReplacementPolicy bool + enableJobSuccessPolicy bool job batchv1.Job action func(context.Context, clientset.Interface, *batchv1.Job) @@ -1393,6 +1394,92 @@ func TestDelayTerminalPhaseCondition(t *testing.T) { }, }, }, + "job scale down to meet completions; JobManagedBy and JobSuccessPolicy are enabled": { + enableJobManagedBy: true, + enableJobSuccessPolicy: true, + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + CompletionMode: ptr.To(batchv1.IndexedCompletion), + Template: podTemplateSpec, + }, + }, + action: succeedOnePodAndScaleDown, + wantInterimStatus: &batchv1.JobStatus{ + Succeeded: 1, + Ready: ptr.To[int32](0), + CompletedIndexes: "0", + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + Reason: batchv1.JobReasonCompletionsReached, + }, + }, + }, + wantTerminalStatus: batchv1.JobStatus{ + Succeeded: 1, + Ready: ptr.To[int32](0), + CompletedIndexes: "0", + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + Reason: batchv1.JobReasonCompletionsReached, + }, + { + Type: batchv1.JobComplete, + Status: v1.ConditionTrue, + Reason: batchv1.JobReasonCompletionsReached, + }, + }, + }, + }, + "job scale down to meet completions; JobPodReplacementPolicy and JobSuccessPolicy are enabled": { + enableJobPodReplacementPolicy: true, + enableJobSuccessPolicy: true, + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + CompletionMode: ptr.To(batchv1.IndexedCompletion), + Template: podTemplateSpec, + }, + }, + action: succeedOnePodAndScaleDown, + wantInterimStatus: &batchv1.JobStatus{ + Succeeded: 1, + Ready: ptr.To[int32](0), + Terminating: ptr.To[int32](1), + CompletedIndexes: "0", + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + Reason: batchv1.JobReasonCompletionsReached, + }, + }, + }, + wantTerminalStatus: batchv1.JobStatus{ + Succeeded: 1, + Ready: ptr.To[int32](0), + Terminating: ptr.To[int32](0), + CompletedIndexes: "0", + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + Reason: batchv1.JobReasonCompletionsReached, + }, + { + Type: batchv1.JobComplete, + Status: v1.ConditionTrue, + Reason: batchv1.JobReasonCompletionsReached, + }, + }, + }, + }, } for name, test := range testCases { t.Run(name, func(t *testing.T) { @@ -1400,6 +1487,7 @@ func TestDelayTerminalPhaseCondition(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, test.enableJobPodReplacementPolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, test.enableJobManagedBy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, true) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, test.enableJobSuccessPolicy) closeFn, restConfig, clientSet, ns := setup(t, "delay-terminal-condition") t.Cleanup(closeFn) From 6e8dc2c2501f6004dbe42b099d6b9c991f2ec9bc Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Sun, 14 Jul 2024 09:19:58 +0900 Subject: [PATCH 2/4] Job: Extend the jobs_finished_total metric reason label with SuccessPolicy and CompletionsReached Signed-off-by: Yuki Iwai --- pkg/controller/job/job_controller.go | 2 +- pkg/controller/job/metrics/metrics.go | 8 +++-- test/integration/job/job_test.go | 45 ++++++++++++++++++++++----- 3 files changed, 44 insertions(+), 11 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 2cd1705debb..28d713a3a87 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -1424,7 +1424,7 @@ func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobC jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached") } jm.recorder.Event(job, v1.EventTypeNormal, "Completed", "Job completed") - metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded", "").Inc() + metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded", finishedCond.Reason).Inc() } else { jm.recorder.Event(job, v1.EventTypeWarning, finishedCond.Reason, finishedCond.Message) metrics.JobFinishedNum.WithLabelValues(completionMode, "failed", finishedCond.Reason).Inc() diff --git a/pkg/controller/job/metrics/metrics.go b/pkg/controller/job/metrics/metrics.go index 2a184ad2721..a52067ada7f 100644 --- a/pkg/controller/job/metrics/metrics.go +++ b/pkg/controller/job/metrics/metrics.go @@ -55,12 +55,14 @@ var ( }, []string{"completion_mode", "result", "action"}, ) - // JobFinishedNum tracks the number of Jobs that finish. Empty reason label - // is used to count successful jobs. + // JobFinishedNum tracks the number of Jobs that finish. + // TODO: Once we remove the JobSuccessPolicy feature gate, we need to remove "" reason label comment. + // When the JobSuccessPolicy feature gate is disabled, empty reason label is used to count successful jobs. + // Otherwise, "CompletionsReached" reason label is used to count successful jobs. // Possible label values: // completion_mode: Indexed, NonIndexed // result: failed, succeeded - // reason: "BackoffLimitExceeded", "DeadlineExceeded", "PodFailurePolicy", "FailedIndexes", "MaxFailedIndexesExceeded", "" + // reason: "BackoffLimitExceeded", "DeadlineExceeded", "PodFailurePolicy", "FailedIndexes", "MaxFailedIndexesExceeded", "SuccessPolicy", "CompletionsReached", "" JobFinishedNum = metrics.NewCounterVec( &metrics.CounterOpts{ Subsystem: JobControllerSubsystem, diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 0e3a17b7466..9908f6ca882 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -511,10 +511,10 @@ func TestSuccessPolicy(t *testing.T) { testCases := map[string]struct { enableJobSuccessPolicy bool enableBackoffLimitPerIndex bool - job batchv1.Job - podTerminations []podTerminationWithExpectations - wantConditionTypes []batchv1.JobConditionType - wantJobFinishedNumMetric []metricLabelsWithValue + job batchv1.Job + podTerminations []podTerminationWithExpectations + wantConditionTypes []batchv1.JobConditionType + wantJobFinishedNumMetric []metricLabelsWithValue }{ "all indexes succeeded; JobSuccessPolicy is enabled": { enableJobSuccessPolicy: true, @@ -547,7 +547,7 @@ func TestSuccessPolicy(t *testing.T) { wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete}, wantJobFinishedNumMetric: []metricLabelsWithValue{ { - Labels: []string{"Indexed", "succeeded", ""}, + Labels: []string{"Indexed", "succeeded", "SuccessPolicy"}, Value: 1, }, }, @@ -587,6 +587,37 @@ func TestSuccessPolicy(t *testing.T) { }, }, }, + "job without successPolicy; incremented the jobs_finished_total metric with CompletionsReached reason": { + enableJobSuccessPolicy: true, + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + Completions: ptr.To[int32](1), + CompletionMode: completionModePtr(batchv1.IndexedCompletion), + Template: podTemplateSpec, + }, + }, + podTerminations: []podTerminationWithExpectations{ + { + index: 0, + status: v1.PodStatus{ + Phase: v1.PodSucceeded, + }, + wantActive: 0, + wantFailed: 0, + wantSucceeded: 1, + wantCompletedIndexes: "0", + wantTerminating: ptr.To[int32](0), + }, + }, + wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete}, + wantJobFinishedNumMetric: []metricLabelsWithValue{ + { + Labels: []string{"Indexed", "succeeded", "CompletionsReached"}, + Value: 1, + }, + }, + }, "job with successPolicy with succeededIndexes; job has SuccessCriteriaMet and Complete conditions even if some indexes remain pending": { enableJobSuccessPolicy: true, job: batchv1.Job{ @@ -629,7 +660,7 @@ func TestSuccessPolicy(t *testing.T) { wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete}, wantJobFinishedNumMetric: []metricLabelsWithValue{ { - Labels: []string{"Indexed", "succeeded", ""}, + Labels: []string{"Indexed", "succeeded", "SuccessPolicy"}, Value: 1, }, }, @@ -676,7 +707,7 @@ func TestSuccessPolicy(t *testing.T) { wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete}, wantJobFinishedNumMetric: []metricLabelsWithValue{ { - Labels: []string{"Indexed", "succeeded", ""}, + Labels: []string{"Indexed", "succeeded", "SuccessPolicy"}, Value: 1, }, }, From 551931c6a85f1b319608559267cf118d266c643d Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Sat, 13 Jul 2024 03:00:18 +0900 Subject: [PATCH 3/4] Graduate the JobSuccessPolicy to beta Signed-off-by: Yuki Iwai --- api/openapi-spec/swagger.json | 2 +- .../v3/apis__batch__v1_openapi.json | 2 +- pkg/apis/batch/types.go | 4 +- pkg/controller/job/job_controller_test.go | 42 ++++++++++++------- pkg/features/kube_features.go | 3 +- pkg/generated/openapi/zz_generated.openapi.go | 2 +- .../src/k8s.io/api/batch/v1/generated.proto | 4 +- staging/src/k8s.io/api/batch/v1/types.go | 6 +-- .../batch/v1/types_swagger_doc_generated.go | 2 +- test/integration/job/job_test.go | 14 +++---- 10 files changed, 48 insertions(+), 33 deletions(-) diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index b0c21560dd6..0bc7a1e91b3 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -4845,7 +4845,7 @@ }, "successPolicy": { "$ref": "#/definitions/io.k8s.api.batch.v1.SuccessPolicy", - "description": "successPolicy specifies the policy when the Job can be declared as succeeded. If empty, the default behavior applies - the Job is declared as succeeded only when the number of succeeded pods equals to the completions. When the field is specified, it must be immutable and works only for the Indexed Jobs. Once the Job meets the SuccessPolicy, the lingering pods are terminated.\n\nThis field is alpha-level. To use this field, you must enable the `JobSuccessPolicy` feature gate (disabled by default)." + "description": "successPolicy specifies the policy when the Job can be declared as succeeded. If empty, the default behavior applies - the Job is declared as succeeded only when the number of succeeded pods equals to the completions. When the field is specified, it must be immutable and works only for the Indexed Jobs. Once the Job meets the SuccessPolicy, the lingering pods are terminated.\n\nThis field is beta-level. To use this field, you must enable the `JobSuccessPolicy` feature gate (enabled by default)." }, "suspend": { "description": "suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job, effectively resetting the ActiveDeadlineSeconds timer too. Defaults to false.", diff --git a/api/openapi-spec/v3/apis__batch__v1_openapi.json b/api/openapi-spec/v3/apis__batch__v1_openapi.json index c985a3945a6..2bd9d655ac1 100644 --- a/api/openapi-spec/v3/apis__batch__v1_openapi.json +++ b/api/openapi-spec/v3/apis__batch__v1_openapi.json @@ -388,7 +388,7 @@ "$ref": "#/components/schemas/io.k8s.api.batch.v1.SuccessPolicy" } ], - "description": "successPolicy specifies the policy when the Job can be declared as succeeded. If empty, the default behavior applies - the Job is declared as succeeded only when the number of succeeded pods equals to the completions. When the field is specified, it must be immutable and works only for the Indexed Jobs. Once the Job meets the SuccessPolicy, the lingering pods are terminated.\n\nThis field is alpha-level. To use this field, you must enable the `JobSuccessPolicy` feature gate (disabled by default)." + "description": "successPolicy specifies the policy when the Job can be declared as succeeded. If empty, the default behavior applies - the Job is declared as succeeded only when the number of succeeded pods equals to the completions. When the field is specified, it must be immutable and works only for the Indexed Jobs. Once the Job meets the SuccessPolicy, the lingering pods are terminated.\n\nThis field is beta-level. To use this field, you must enable the `JobSuccessPolicy` feature gate (enabled by default)." }, "suspend": { "description": "suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job, effectively resetting the ActiveDeadlineSeconds timer too. Defaults to false.", diff --git a/pkg/apis/batch/types.go b/pkg/apis/batch/types.go index a76fa859dd9..19ca116208b 100644 --- a/pkg/apis/batch/types.go +++ b/pkg/apis/batch/types.go @@ -339,8 +339,8 @@ type JobSpec struct { // When the field is specified, it must be immutable and works only for the Indexed Jobs. // Once the Job meets the SuccessPolicy, the lingering pods are terminated. // - // This field is alpha-level. To use this field, you must enable the - // `JobSuccessPolicy` feature gate (disabled by default). + // This field is beta-level. To use this field, you must enable the + // `JobSuccessPolicy` feature gate (enabled by default). // +optional SuccessPolicy *SuccessPolicy diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index b79226a9c39..49e814c1d8f 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -5171,12 +5171,16 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { - Type: batch.JobSuccessCriteriaMet, - Status: v1.ConditionTrue, + Type: batch.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + Reason: batch.JobReasonCompletionsReached, + Message: "Reached expected number of succeeded pods", }, { - Type: batch.JobComplete, - Status: v1.ConditionTrue, + Type: batch.JobComplete, + Status: v1.ConditionTrue, + Reason: batch.JobReasonCompletionsReached, + Message: "Reached expected number of succeeded pods", }, }, }, @@ -7105,8 +7109,10 @@ func TestJobBackoffForOnFailure(t *testing.T) { expectedFailed: 0, expectedConditions: []batch.JobCondition{ { - Type: batch.JobComplete, - Status: v1.ConditionTrue, + Type: batch.JobComplete, + Status: v1.ConditionTrue, + Reason: batch.JobReasonCompletionsReached, + Message: "Reached expected number of succeeded pods", }, }, }, @@ -7124,12 +7130,16 @@ func TestJobBackoffForOnFailure(t *testing.T) { expectedFailed: 0, expectedConditions: []batch.JobCondition{ { - Type: batch.JobSuccessCriteriaMet, - Status: v1.ConditionTrue, + Type: batch.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + Reason: batch.JobReasonCompletionsReached, + Message: "Reached expected number of succeeded pods", }, { - Type: batch.JobComplete, - Status: v1.ConditionTrue, + Type: batch.JobComplete, + Status: v1.ConditionTrue, + Reason: batch.JobReasonCompletionsReached, + Message: "Reached expected number of succeeded pods", }, }, }, @@ -7147,12 +7157,16 @@ func TestJobBackoffForOnFailure(t *testing.T) { expectedFailed: 0, expectedConditions: []batch.JobCondition{ { - Type: batch.JobSuccessCriteriaMet, - Status: v1.ConditionTrue, + Type: batch.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + Reason: batch.JobReasonCompletionsReached, + Message: "Reached expected number of succeeded pods", }, { - Type: batch.JobComplete, - Status: v1.ConditionTrue, + Type: batch.JobComplete, + Status: v1.ConditionTrue, + Reason: batch.JobReasonCompletionsReached, + Message: "Reached expected number of succeeded pods", }, }, }, diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 93c98a97cd5..9c6f07a826b 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -369,6 +369,7 @@ const ( // owner: @tenzen-y // kep: https://kep.k8s.io/3998 // alpha: v1.30 + // beta: v1.31 // // Allow users to specify when a Job can be declared as succeeded // based on the set of succeeded pods. @@ -1094,7 +1095,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS JobPodReplacementPolicy: {Default: true, PreRelease: featuregate.Beta}, - JobSuccessPolicy: {Default: false, PreRelease: featuregate.Alpha}, + JobSuccessPolicy: {Default: true, PreRelease: featuregate.Beta}, KubeletCgroupDriverFromCRI: {Default: true, PreRelease: featuregate.Beta}, diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index 4a5e35e48fe..ba21737ce91 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -17224,7 +17224,7 @@ func schema_k8sio_api_batch_v1_JobSpec(ref common.ReferenceCallback) common.Open }, "successPolicy": { SchemaProps: spec.SchemaProps{ - Description: "successPolicy specifies the policy when the Job can be declared as succeeded. If empty, the default behavior applies - the Job is declared as succeeded only when the number of succeeded pods equals to the completions. When the field is specified, it must be immutable and works only for the Indexed Jobs. Once the Job meets the SuccessPolicy, the lingering pods are terminated.\n\nThis field is alpha-level. To use this field, you must enable the `JobSuccessPolicy` feature gate (disabled by default).", + Description: "successPolicy specifies the policy when the Job can be declared as succeeded. If empty, the default behavior applies - the Job is declared as succeeded only when the number of succeeded pods equals to the completions. When the field is specified, it must be immutable and works only for the Indexed Jobs. Once the Job meets the SuccessPolicy, the lingering pods are terminated.\n\nThis field is beta-level. To use this field, you must enable the `JobSuccessPolicy` feature gate (enabled by default).", Ref: ref("k8s.io/api/batch/v1.SuccessPolicy"), }, }, diff --git a/staging/src/k8s.io/api/batch/v1/generated.proto b/staging/src/k8s.io/api/batch/v1/generated.proto index 68c57364acd..f5a9385f5e0 100644 --- a/staging/src/k8s.io/api/batch/v1/generated.proto +++ b/staging/src/k8s.io/api/batch/v1/generated.proto @@ -222,8 +222,8 @@ message JobSpec { // When the field is specified, it must be immutable and works only for the Indexed Jobs. // Once the Job meets the SuccessPolicy, the lingering pods are terminated. // - // This field is alpha-level. To use this field, you must enable the - // `JobSuccessPolicy` feature gate (disabled by default). + // This field is beta-level. To use this field, you must enable the + // `JobSuccessPolicy` feature gate (enabled by default). // +optional optional SuccessPolicy successPolicy = 16; diff --git a/staging/src/k8s.io/api/batch/v1/types.go b/staging/src/k8s.io/api/batch/v1/types.go index d91435b84d2..b42ec231e45 100644 --- a/staging/src/k8s.io/api/batch/v1/types.go +++ b/staging/src/k8s.io/api/batch/v1/types.go @@ -347,8 +347,8 @@ type JobSpec struct { // When the field is specified, it must be immutable and works only for the Indexed Jobs. // Once the Job meets the SuccessPolicy, the lingering pods are terminated. // - // This field is alpha-level. To use this field, you must enable the - // `JobSuccessPolicy` feature gate (disabled by default). + // This field is beta-level. To use this field, you must enable the + // `JobSuccessPolicy` feature gate (enabled by default). // +optional SuccessPolicy *SuccessPolicy `json:"successPolicy,omitempty" protobuf:"bytes,16,opt,name=successPolicy"` @@ -649,7 +649,7 @@ const ( // JobReasonSuccessPolicy reason indicates a SuccessCriteriaMet condition is added due to // a Job met successPolicy. // https://kep.k8s.io/3998 - // This is currently an alpha field. + // This is currently a beta field. JobReasonSuccessPolicy string = "SuccessPolicy" // JobReasonCompletionsReached reason indicates a SuccessCriteriaMet condition is added due to // a number of succeeded Job pods met completions. diff --git a/staging/src/k8s.io/api/batch/v1/types_swagger_doc_generated.go b/staging/src/k8s.io/api/batch/v1/types_swagger_doc_generated.go index 0ffdcadd86d..d504887884a 100644 --- a/staging/src/k8s.io/api/batch/v1/types_swagger_doc_generated.go +++ b/staging/src/k8s.io/api/batch/v1/types_swagger_doc_generated.go @@ -116,7 +116,7 @@ var map_JobSpec = map[string]string{ "completions": "Specifies the desired number of successfully finished pods the job should be run with. Setting to null means that the success of any pod signals the success of all pods, and allows parallelism to have any positive value. Setting to 1 means that parallelism is limited to 1 and the success of that pod signals the success of the job. More info: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/", "activeDeadlineSeconds": "Specifies the duration in seconds relative to the startTime that the job may be continuously active before the system tries to terminate it; value must be positive integer. If a Job is suspended (at creation or through an update), this timer will effectively be stopped and reset when the Job is resumed again.", "podFailurePolicy": "Specifies the policy of handling failed pods. In particular, it allows to specify the set of actions and conditions which need to be satisfied to take the associated action. If empty, the default behaviour applies - the counter of failed pods, represented by the jobs's .status.failed field, is incremented and it is checked against the backoffLimit. This field cannot be used in combination with restartPolicy=OnFailure.", - "successPolicy": "successPolicy specifies the policy when the Job can be declared as succeeded. If empty, the default behavior applies - the Job is declared as succeeded only when the number of succeeded pods equals to the completions. When the field is specified, it must be immutable and works only for the Indexed Jobs. Once the Job meets the SuccessPolicy, the lingering pods are terminated.\n\nThis field is alpha-level. To use this field, you must enable the `JobSuccessPolicy` feature gate (disabled by default).", + "successPolicy": "successPolicy specifies the policy when the Job can be declared as succeeded. If empty, the default behavior applies - the Job is declared as succeeded only when the number of succeeded pods equals to the completions. When the field is specified, it must be immutable and works only for the Indexed Jobs. Once the Job meets the SuccessPolicy, the lingering pods are terminated.\n\nThis field is beta-level. To use this field, you must enable the `JobSuccessPolicy` feature gate (enabled by default).", "backoffLimit": "Specifies the number of retries before marking this job failed. Defaults to 6", "backoffLimitPerIndex": "Specifies the limit for the number of retries within an index before marking this index as failed. When enabled the number of failures per index is kept in the pod's batch.kubernetes.io/job-index-failure-count annotation. It can only be set when Job's completionMode=Indexed, and the Pod's restart policy is Never. The field is immutable. This field is beta-level. It can be used when the `JobBackoffLimitPerIndex` feature gate is enabled (enabled by default).", "maxFailedIndexes": "Specifies the maximal number of failed indexes before marking the Job as failed, when backoffLimitPerIndex is set. Once the number of failed indexes exceeds this number the entire Job is marked as Failed and its execution is terminated. When left as null the job continues execution of all of its indexes and is marked with the `Complete` Job condition. It can only be specified when backoffLimitPerIndex is set. It can be null or up to completions. It is required and must be less than or equal to 10^4 when is completions greater than 10^5. This field is beta-level. It can be used when the `JobBackoffLimitPerIndex` feature gate is enabled (enabled by default).", diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 9908f6ca882..39955ca8471 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -389,7 +389,7 @@ func TestJobPodFailurePolicy(t *testing.T) { Value: 1, }, wantJobFinishedMetric: metricLabelsWithValue{ - Labels: []string{"NonIndexed", "succeeded", ""}, + Labels: []string{"NonIndexed", "succeeded", "CompletionsReached"}, Value: 1, }, }, @@ -400,7 +400,7 @@ func TestJobPodFailurePolicy(t *testing.T) { wantFailed: 1, wantJobConditionType: batchv1.JobComplete, wantJobFinishedMetric: metricLabelsWithValue{ - Labels: []string{"NonIndexed", "succeeded", ""}, + Labels: []string{"NonIndexed", "succeeded", "CompletionsReached"}, Value: 1, }, wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{ @@ -415,7 +415,7 @@ func TestJobPodFailurePolicy(t *testing.T) { wantFailed: 1, wantJobConditionType: batchv1.JobComplete, wantJobFinishedMetric: metricLabelsWithValue{ - Labels: []string{"NonIndexed", "succeeded", ""}, + Labels: []string{"NonIndexed", "succeeded", "CompletionsReached"}, Value: 1, }, wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{ @@ -2474,7 +2474,7 @@ func TestNonParallelJob(t *testing.T) { }) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{ - Labels: []string{"NonIndexed", "succeeded", ""}, + Labels: []string{"NonIndexed", "succeeded", "CompletionsReached"}, Value: 1, }) validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{ @@ -2562,7 +2562,7 @@ func TestParallelJob(t *testing.T) { validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 7) validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{ - Labels: []string{"NonIndexed", "succeeded", ""}, + Labels: []string{"NonIndexed", "succeeded", "CompletionsReached"}, Value: 1, }) validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{ @@ -2704,7 +2704,7 @@ func TestParallelJobWithCompletions(t *testing.T) { validateJobPodsStatus(ctx, t, clientSet, jobObj, want) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{ - Labels: []string{"NonIndexed", "succeeded", ""}, + Labels: []string{"NonIndexed", "succeeded", "CompletionsReached"}, Value: 1, }) validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{ @@ -2800,7 +2800,7 @@ func TestIndexedJob(t *testing.T) { Value: 4, }) validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{ - Labels: []string{"Indexed", "succeeded", ""}, + Labels: []string{"Indexed", "succeeded", "CompletionsReached"}, Value: 1, }) validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{ From 0d4f18bd5b8e05211e985f31781cced9aac42e78 Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Sat, 13 Jul 2024 03:00:18 +0900 Subject: [PATCH 4/4] Job: Implement E2E tests for the JobSuccessPolicy Signed-off-by: Yuki Iwai --- test/e2e/apps/job.go | 148 +++++++++++++++++++++++++++++---- test/e2e/framework/job/wait.go | 21 +++-- 2 files changed, 148 insertions(+), 21 deletions(-) diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index 5dc334b4ebd..b09d70be995 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "math" "strconv" batchv1 "k8s.io/api/batch/v1" @@ -81,7 +82,7 @@ var _ = SIGDescribe("Job", func() { framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) ginkgo.By("Ensuring job reaches completions") - err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(batchv1.JobReasonCompletionsReached), completions) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) ginkgo.By("Ensuring pods for job exist") @@ -173,7 +174,7 @@ var _ = SIGDescribe("Job", func() { framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) ginkgo.By("Ensuring job reaches completions") - err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(batchv1.JobReasonCompletionsReached), completions) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) }) @@ -253,7 +254,7 @@ var _ = SIGDescribe("Job", func() { framework.ExpectNoError(err, "failed to await for the pod to be deleted: %s/%s", pod.Name, pod.Namespace) ginkgo.By("Ensuring job reaches completions") - err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(batchv1.JobReasonCompletionsReached), completions) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) }) @@ -342,7 +343,7 @@ var _ = SIGDescribe("Job", func() { framework.ExpectNoError(err, "failed to await for the pod to be deleted: %s/%s", pod.Name, pod.Namespace) ginkgo.By("Ensuring job reaches completions") - err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(batchv1.JobReasonCompletionsReached), completions) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) }) @@ -374,7 +375,7 @@ var _ = SIGDescribe("Job", func() { framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name) ginkgo.By("Waiting for job to complete") - err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(batchv1.JobReasonCompletionsReached), completions) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) }) @@ -499,7 +500,7 @@ done`} framework.ExpectNoError(err, "failed to create indexed job in namespace %s", f.Namespace.Name) ginkgo.By("Ensuring job reaches completions") - err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, nil, completions) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) ginkgo.By("Ensuring pods with index for job exist") @@ -520,6 +521,123 @@ done`} gomega.Expect(gotIndexes).To(gomega.Equal(wantIndexes), "expected completed indexes %s, but got %s", wantIndexes, gotIndexes) }) + /* + Testcase: Ensure that job with successPolicy succeeded when all indexes succeeded + Description: Create an indexed job with successPolicy. + Verify that job got SuccessCriteriaMet with SuccessPolicy reason and Complete condition + once all indexes succeeded. + */ + ginkgo.It("with successPolicy should succeeded when all indexes succeeded", func(ctx context.Context) { + parallelism := int32(2) + completions := int32(2) + backoffLimit := int32(6) // default value + + ginkgo.By("Creating an indexed job with successPolicy") + job := e2ejob.NewTestJob("succeeded", "with-success-policy-all-index-succeeded", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) + job.Spec.CompletionMode = ptr.To(batchv1.IndexedCompletion) + job.Spec.SuccessPolicy = &batchv1.SuccessPolicy{ + Rules: []batchv1.SuccessPolicyRule{{ + SucceededCount: ptr.To[int32](2), + }}, + } + job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Awaiting for the job to have the interim SuccessCriteriaMet with SuccessPolicy reason condition") + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, ptr.To(batchv1.JobReasonSuccessPolicy)) + framework.ExpectNoError(err, "failed to ensure that job has SuccessCriteriaMet with SuccessPolicy reason condition") + + ginkgo.By("Ensure that the job reaches completions") + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(batchv1.JobReasonSuccessPolicy), completions) + framework.ExpectNoError(err, "failed to ensure that job completed") + + ginkgo.By("Verifying that the job status to ensure correct final state") + job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err, "failed to get latest job object") + gomega.Expect(job.Status.Active).Should(gomega.Equal(int32(0))) + gomega.Expect(job.Status.Ready).Should(gomega.Equal(ptr.To[int32](0))) + gomega.Expect(job.Status.Terminating).Should(gomega.Equal(ptr.To[int32](0))) + gomega.Expect(job.Status.Failed).Should(gomega.Equal(int32(0))) + }) + + /* + Testcase: Ensure that job with successPolicy succeededIndexes rule succeeded even when some indexes remain pending + Description: Create an indexed job with successPolicy succeededIndexes rule. + Verify that the job got SuccessCriteriaMet with SuccessPolicy reason condition and Complete condition + when the job met successPolicy even if some indexed remain pending. + */ + ginkgo.It("with successPolicy succeededIndexes rule should succeeded even when some indexes remain pending", func(ctx context.Context) { + parallelism := int32(2) + completions := int32(5) + backoffLimit := int32(6) // default value + + ginkgo.By("Creating an indexed job with successPolicy succeededIndexes rule") + job := e2ejob.NewTestJob("failOddSucceedEven", "with-success-policy-succeeded-indexed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) + job.Spec.CompletionMode = ptr.To(batchv1.IndexedCompletion) + job.Spec.SuccessPolicy = &batchv1.SuccessPolicy{ + Rules: []batchv1.SuccessPolicyRule{{ + SucceededIndexes: ptr.To("0"), + }}, + } + job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Awaiting for the job to have the interim SuccessCriteriaMet with SuccessPolicy reason condition") + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, ptr.To(batchv1.JobReasonSuccessPolicy)) + framework.ExpectNoError(err, "failed to ensure that job has SuccessCriteriaMet with SuccessPolicy reason condition") + + ginkgo.By("Ensure that the job reaches completions") + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(batchv1.JobReasonSuccessPolicy), 1) + framework.ExpectNoError(err, "failed to ensure that job completed") + + ginkgo.By("Verifying that the only appropriately index succeeded") + job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err, "failed to get the latest job object") + gomega.Expect(job.Status.CompletedIndexes).Should(gomega.Equal("0")) + gomega.Expect(job.Status.Active).Should(gomega.Equal(int32(0))) + gomega.Expect(job.Status.Ready).Should(gomega.Equal(ptr.To[int32](0))) + gomega.Expect(job.Status.Terminating).Should(gomega.Equal(ptr.To[int32](0))) + }) + + /* + Testcase: Ensure that job with successPolicy succeededCount rule succeeded even when some indexes remain pending + Description: Create an indexed job with successPolicy succeededCount rule. + Verify that the job got the SuccessCriteriaMet with SuccessPolicy reason condition and Complete condition + when the job met successPolicy even if some indexed remain pending. + */ + ginkgo.It("with successPolicy succeededCount rule should succeeded even when some indexes remain pending", func(ctx context.Context) { + parallelism := int32(2) + completions := int32(5) + backoffLimit := int32(math.MaxInt32) + + ginkgo.By("Creating an indexed job with successPolicy succeededCount rule") + job := e2ejob.NewTestJob("failOddSucceedEven", "with-success-policy-succeeded-count", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) + job.Spec.CompletionMode = ptr.To(batchv1.IndexedCompletion) + job.Spec.SuccessPolicy = &batchv1.SuccessPolicy{ + Rules: []batchv1.SuccessPolicyRule{{ + SucceededCount: ptr.To[int32](1), + }}, + } + job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Awaiting for the job to have the interim SuccessCriteriaMet condition with SuccessPolicy reason") + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, ptr.To(batchv1.JobReasonSuccessPolicy)) + framework.ExpectNoError(err, "failed to ensure that the job has SuccessCriteriaMet condition with SuccessPolicy rule") + + ginkgo.By("Ensure that the job reaches completions") + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(batchv1.JobReasonSuccessPolicy), 1) + framework.ExpectNoError(err, "failed to ensure that job completed") + + ginkgo.By("Verifying that the job status to ensure correct final state") + job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err, "failed to get the latest job object") + gomega.Expect(job.Status.CompletedIndexes).Should(gomega.Equal("0")) + gomega.Expect(job.Status.Active).Should(gomega.Equal(int32(0))) + gomega.Expect(job.Status.Ready).Should(gomega.Equal(ptr.To[int32](0))) + gomega.Expect(job.Status.Terminating).Should(gomega.Equal(ptr.To[int32](0))) + }) + /* Testcase: Ensure that all indexes are executed for an indexed job with backoffLimitPerIndex despite some failing Description: Create an indexed job and ensure that all indexes are either failed or succeeded, depending @@ -683,7 +801,7 @@ done`} framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) ginkgo.By("Ensuring job reaches completions") - err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, nil, completions) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) }) @@ -710,11 +828,11 @@ done`} framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) ginkgo.By("Awaiting for the job to have the interim success condition") - err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, batchv1.JobReasonCompletionsReached) + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, ptr.To(batchv1.JobReasonCompletionsReached)) framework.ExpectNoError(err, "failed to ensure job has the interim success condition: %s", f.Namespace.Name) ginkgo.By("Ensuring job reaches completions") - err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, *job.Spec.Completions) + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(batchv1.JobReasonCompletionsReached), *job.Spec.Completions) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) ginkgo.By("Verifying the Job status fields to ensure correct final state") @@ -737,11 +855,11 @@ done`} framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) ginkgo.By("Awaiting for the job to have the interim failure condition") - err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, batchv1.JobReasonDeadlineExceeded) + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, ptr.To(batchv1.JobReasonDeadlineExceeded)) framework.ExpectNoError(err, "failed to ensure job has the interim failure condition: %s", f.Namespace.Name) ginkgo.By("Ensuring job past active deadline") - err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailed, batchv1.JobReasonDeadlineExceeded) + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailed, ptr.To(batchv1.JobReasonDeadlineExceeded)) framework.ExpectNoError(err, "failed to ensure job past active deadline in namespace: %s", f.Namespace.Name) ginkgo.By("Verifying the Job status fields to ensure correct final state") @@ -852,11 +970,11 @@ done`} framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) ginkgo.By("Awaiting for the job to have the interim failure condition") - err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, batchv1.JobReasonBackoffLimitExceeded) + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, ptr.To(batchv1.JobReasonBackoffLimitExceeded)) framework.ExpectNoError(err, "failed to ensure job has the interim failure condition: %s", f.Namespace.Name) ginkgo.By("Ensuring job exceed backofflimit") - err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailed, batchv1.JobReasonBackoffLimitExceeded) + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailed, ptr.To(batchv1.JobReasonBackoffLimitExceeded)) framework.ExpectNoError(err, "failed to ensure job exceed backofflimit in namespace: %s", f.Namespace.Name) ginkgo.By(fmt.Sprintf("Checking that %d pod created and status is failed", backoff+1)) @@ -909,7 +1027,7 @@ done`} framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) ginkgo.By("Ensuring job reaches completions") - err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(batchv1.JobReasonCompletionsReached), completions) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) ginkgo.By("Ensuring pods for job exist") @@ -1115,7 +1233,7 @@ done`} framework.Logf("Job: %v as labels: %v", testJob.Name, testJob.Labels) ginkgo.By("Waiting for job to complete") - err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, ns, jobName, completions) + err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, ns, jobName, nil, completions) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", ns) ginkgo.By("Delete a job collection with a labelselector") diff --git a/test/e2e/framework/job/wait.go b/test/e2e/framework/job/wait.go index 2c994bbf0e7..a7c3d265e5f 100644 --- a/test/e2e/framework/job/wait.go +++ b/test/e2e/framework/job/wait.go @@ -68,7 +68,14 @@ func waitForJobPodsInPhase(ctx context.Context, c clientset.Interface, ns, jobNa } // WaitForJobComplete uses c to wait for completions to complete for the Job jobName in namespace ns. -func WaitForJobComplete(ctx context.Context, c clientset.Interface, ns, jobName string, completions int32) error { +// This function checks if the number of succeeded Job Pods reached expected completions and +// the Job has a "Complete" condition with the expected reason. +// The pointer "reason" argument allows us to skip "Complete" condition reason verifications. +// The conformance test cases have the different expected "Complete" condition reason ("CompletionsReached" vs "") +// between conformance CI jobs and e2e CI jobs since the e2e conformance test cases are performed in +// both conformance CI jobs with GA-only features and e2e CI jobs with all default-enabled features. +// So, we need to skip "Complete" condition reason verifications in the e2e conformance test cases. +func WaitForJobComplete(ctx context.Context, c clientset.Interface, ns, jobName string, reason *string, completions int32) error { if err := wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, false, func(ctx context.Context) (bool, error) { curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{}) if err != nil { @@ -78,7 +85,7 @@ func WaitForJobComplete(ctx context.Context, c clientset.Interface, ns, jobName }); err != nil { return nil } - return WaitForJobCondition(ctx, c, ns, jobName, batchv1.JobComplete, "") + return WaitForJobCondition(ctx, c, ns, jobName, batchv1.JobComplete, reason) } // WaitForJobReady waits for particular value of the Job .status.ready field @@ -115,8 +122,10 @@ func WaitForJobFailed(c clientset.Interface, ns, jobName string) error { }) } -// waitForJobCondition waits for the specified Job to have the expected condition with the specific reason. -func WaitForJobCondition(ctx context.Context, c clientset.Interface, ns, jobName string, cType batchv1.JobConditionType, reason string) error { +// WaitForJobCondition waits for the specified Job to have the expected condition with the specific reason. +// When the nil reason is passed, the "reason" string in the condition is +// not checked. +func WaitForJobCondition(ctx context.Context, c clientset.Interface, ns, jobName string, cType batchv1.JobConditionType, reason *string) error { err := wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, false, func(ctx context.Context) (bool, error) { curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{}) if err != nil { @@ -124,7 +133,7 @@ func WaitForJobCondition(ctx context.Context, c clientset.Interface, ns, jobName } for _, c := range curr.Status.Conditions { if c.Type == cType && c.Status == v1.ConditionTrue { - if reason == c.Reason { + if reason == nil || *reason == c.Reason { return true, nil } } @@ -132,7 +141,7 @@ func WaitForJobCondition(ctx context.Context, c clientset.Interface, ns, jobName return false, nil }) if err != nil { - return fmt.Errorf("waiting for Job %q to have the condition %q with reason: %q: %w", jobName, cType, reason, err) + return fmt.Errorf("waiting for Job %q to have the condition %q with reason: %v: %w", jobName, cType, reason, err) } return nil }