From d41302312ef0b938e7a8c97021ee47e64bc9b173 Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Fri, 20 Jan 2023 23:55:54 +0000 Subject: [PATCH 1/5] update validation logic so completions is mutable iff completions is modified in tandem with parallelsim so completions == parallelism --- pkg/apis/batch/validation/validation.go | 31 ++- pkg/apis/batch/validation/validation_test.go | 110 +++++++++- pkg/controller/job/job_controller_test.go | 3 +- pkg/features/kube_features.go | 11 + test/integration/job/job_test.go | 203 +++++++++++++++++++ 5 files changed, 354 insertions(+), 4 deletions(-) diff --git a/pkg/apis/batch/validation/validation.go b/pkg/apis/batch/validation/validation.go index 26c365b2f98..0bac3f6fbb1 100644 --- a/pkg/apis/batch/validation/validation.go +++ b/pkg/apis/batch/validation/validation.go @@ -31,9 +31,11 @@ import ( "k8s.io/apimachinery/pkg/util/sets" apimachineryvalidation "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" apivalidation "k8s.io/kubernetes/pkg/apis/core/validation" + "k8s.io/kubernetes/pkg/features" "k8s.io/utils/pointer" ) @@ -375,7 +377,7 @@ func ValidateJobUpdateStatus(job, oldJob *batch.Job) field.ErrorList { func ValidateJobSpecUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts JobValidationOptions) field.ErrorList { allErrs := field.ErrorList{} allErrs = append(allErrs, ValidateJobSpec(&spec, fldPath, opts.PodValidationOptions)...) - allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath.Child("completions"))...) + allErrs = append(allErrs, validateCompletions(spec, oldSpec, fldPath.Child("completions"))...) allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Selector, oldSpec.Selector, fldPath.Child("selector"))...) allErrs = append(allErrs, validatePodTemplateUpdate(spec, oldSpec, fldPath, opts)...) allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.CompletionMode, oldSpec.CompletionMode, fldPath.Child("completionMode"))...) @@ -565,6 +567,33 @@ func ValidateJobTemplateSpec(spec *batch.JobTemplateSpec, fldPath *field.Path, o return allErrs } +func validateCompletions(spec, oldSpec batch.JobSpec, fldPath *field.Path) field.ErrorList { + var allErrs field.ErrorList + + // Completions is immutable for non-indexed jobs. + // For Indexed Jobs, if ElasticIndexedJob feature gate is not enabled, + // fall back to validating that spec.Completions is always immutable. + isIndexedJob := spec.CompletionMode != nil && *spec.CompletionMode == batch.IndexedCompletion + if !isIndexedJob || !feature.DefaultFeatureGate.Enabled(features.ElasticIndexedJob) { + return apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath) + } + + // Indexed Jobs cannot set completions to nil. The nil check + // is already performed in validateJobSpec, no need to add another error. + if spec.Completions == nil || oldSpec.Completions == nil { + return allErrs + } + + // For Indexed Jobs, spec.Completions can only be mutated in tandem + // with spec.Parallelism such that spec.Completions == spec.Parallelism + if *spec.Completions != *oldSpec.Completions { + if *spec.Completions != *spec.Parallelism || *oldSpec.Completions != *oldSpec.Parallelism { + allErrs = append(allErrs, field.Invalid(fldPath, spec.Completions, fmt.Sprintf("can only be modified in tandem with %s", fldPath.Root().Child("parallelism").String()))) + } + } + return allErrs +} + type JobValidationOptions struct { apivalidation.PodValidationOptions // Allow Job to have the annotation batch.kubernetes.io/job-tracking diff --git a/pkg/apis/batch/validation/validation_test.go b/pkg/apis/batch/validation/validation_test.go index b15f3bdd7b5..7055c245978 100644 --- a/pkg/apis/batch/validation/validation_test.go +++ b/pkg/apis/batch/validation/validation_test.go @@ -919,7 +919,7 @@ func TestValidateJobUpdate(t *testing.T) { job.Spec.ManualSelector = pointer.BoolPtr(true) }, }, - "immutable completion": { + "immutable completions for non-indexed jobs": { old: batch.Job{ ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, Spec: batch.JobSpec{ @@ -1283,6 +1283,114 @@ func TestValidateJobUpdate(t *testing.T) { AllowMutableSchedulingDirectives: true, }, }, + "update completions and parallelism to same value is valid": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(1), + Parallelism: pointer.Int32Ptr(1), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(2) + job.Spec.Parallelism = pointer.Int32Ptr(2) + }, + }, + "previous parallelism != previous completions, new parallelism == new completions": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(1), + Parallelism: pointer.Int32Ptr(2), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(3) + job.Spec.Parallelism = pointer.Int32Ptr(3) + }, + err: &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "spec.completions", + }, + }, + "indexed job updating completions and parallelism to different values is invalid": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(1), + Parallelism: pointer.Int32Ptr(1), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(2) + job.Spec.Parallelism = pointer.Int32Ptr(3) + }, + err: &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "spec.completions", + }, + }, + "indexed job with completions set updated to nil does not panic": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(1), + Parallelism: pointer.Int32Ptr(1), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = nil + job.Spec.Parallelism = pointer.Int32Ptr(3) + }, + err: &field.Error{ + Type: field.ErrorTypeRequired, + Field: "spec.completions", + }, + }, + "indexed job with completions unchanged, parallelism reduced to less than completions": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(2), + Parallelism: pointer.Int32Ptr(2), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(2) + job.Spec.Parallelism = pointer.Int32Ptr(1) + }, + }, + "indexed job with completions unchanged, parallelism increased higher than completions": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(2), + Parallelism: pointer.Int32Ptr(2), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(2) + job.Spec.Parallelism = pointer.Int32Ptr(3) + }, + }, } ignoreValueAndDetail := cmpopts.IgnoreFields(field.Error{}, "BadValue", "Detail") for k, tc := range cases { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 9f5b3d7b1f2..6cfadb3269c 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -237,8 +237,7 @@ func TestControllerSyncJob(t *testing.T) { expectedConditionStatus v1.ConditionStatus expectedConditionReason string expectedCreatedIndexes sets.Int - - expectedPodPatches int + expectedPodPatches int // features jobReadyPodsEnabled bool diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 2c088b90faf..e27e12a740e 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -685,6 +685,15 @@ const ( // certificate as expiration approaches. RotateKubeletServerCertificate featuregate.Feature = "RotateKubeletServerCertificate" + // owner: @danielvegamyhre + // kep: https://kep.k8s.io/2413 + // beta: v1.27 + // + // Allows mutating spec.completions for Indexed job when done in tandem with + // spec.parallelism. Specifically, spec.completions is mutable iff spec.completions + // equals to spec.parallelism before and after the update. + ElasticIndexedJob featuregate.Feature = "ElasticIndexedJob" + // owner: @saschagrunert // kep: https://kep.k8s.io/2413 // alpha: v1.22 @@ -1023,6 +1032,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS RotateKubeletServerCertificate: {Default: true, PreRelease: featuregate.Beta}, + ElasticIndexedJob: {Default: true, PreRelease: featuregate.Beta}, + SeccompDefault: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 ServiceIPStaticSubrange: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28 diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 7d6fcfd8e1d..ba6a8ce4515 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -31,9 +31,12 @@ import ( batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" eventsv1 "k8s.io/api/events/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/util/feature" @@ -1026,6 +1029,196 @@ func TestIndexedJob(t *testing.T) { validateTerminatedPodsTrackingFinalizerMetric(t, 5) } +func TestElasticIndexedJob(t *testing.T) { + const ( + noCompletionsUpdate = -1 + initialCompletions = 3 + ) + type jobUpdate struct { + completions int + succeededIndexes []int + failedIndexes []int + wantSucceededIndexes string + wantFailed int + } + cases := map[string]struct { + featureGate bool + jobUpdates []jobUpdate + wantErr *apierrors.StatusError + }{ + "feature flag off, mutation not allowed": { + jobUpdates: []jobUpdate{ + { + completions: 4, + }, + }, + wantErr: apierrors.NewInvalid( + schema.GroupKind{Group: "batch", Kind: "Job"}, + "test-job", + field.ErrorList{field.Invalid(field.NewPath("spec", "completions"), 4, "field is immutable")}, + ), + }, + "scale up, verify that the range expands, and the job finishes successfully when indexes including the ones in the new range exit successfully": { + featureGate: true, + jobUpdates: []jobUpdate{ + { + // Scale up completions 3->4 then succeed indexes 0-3 + completions: 4, + succeededIndexes: []int{0, 1, 2, 3}, + wantSucceededIndexes: "0-3", + }, + }, + }, + "scale down, verify that the range shrinks, and the job finishes successfully when indexes only in the smaller range finishes successfully, and verify that failures that happened for indexes that are now outside the range still count": { + featureGate: true, + jobUpdates: []jobUpdate{ + // First succeed index 1 and fail index 2 while completions is still original value (3). + { + completions: noCompletionsUpdate, + succeededIndexes: []int{1}, + failedIndexes: []int{2}, + wantSucceededIndexes: "1", + wantFailed: 1, + }, + // Scale down completions 3->1, verify prev failure out of range still counts + // but succeeded out of range does not. + { + completions: 1, + succeededIndexes: []int{0}, + wantSucceededIndexes: "0", + wantFailed: 1, + }, + }, + }, + "index finishes successfully, scale down to exclude the index, then scale up to include it back. verify that the index was restarted and job finishes successfully after all indexes complete": { + featureGate: true, + jobUpdates: []jobUpdate{ + // First succeed index 2 while completions is still original value (3). + { + completions: noCompletionsUpdate, + succeededIndexes: []int{2}, + wantSucceededIndexes: "2", + }, + // Scale completions down 3->2 to exclude previously succeeded index. + { + completions: 2, + }, + // Scale completions back up to include previously succeeded index that was temporarily out of range. + { + completions: 3, + succeededIndexes: []int{0, 1, 2}, + wantSucceededIndexes: "0-2", + }, + }, + }, + "scale down to 0, verify that the job succeeds": { + featureGate: true, + jobUpdates: []jobUpdate{ + {}, + }, + }, + } + + for name, tc := range cases { + tc := tc + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, tc.featureGate)() + closeFn, restConfig, clientSet, ns := setup(t, "indexed") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + defer cancel() + resetMetrics() + + // Set up initial Job in Indexed completion mode. + mode := batchv1.IndexedCompletion + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(int32(initialCompletions)), + Completions: pointer.Int32Ptr(int32(initialCompletions)), + CompletionMode: &mode, + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace) + + // Wait for pods to start up. + err = wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if job.Status.Active == int32(initialCompletions) { + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("Error waiting for Job pods to become active: %v", err) + } + + currentCompletions := initialCompletions + + for _, update := range tc.jobUpdates { + // Update Job spec if necessary. + if update.completions != noCompletionsUpdate { + if jobObj, err = updateJob(ctx, jobClient, jobObj.Name, func(j *batchv1.Job) { + j.Spec.Completions = pointer.Int32Ptr(int32(update.completions)) + j.Spec.Parallelism = pointer.Int32Ptr(int32(update.completions)) + }); err != nil { + if tc.wantErr == nil { + t.Fatalf("Failed to update Job: %v", err) + } + statusErr := err.(*apierrors.StatusError) + if diff := cmp.Diff(tc.wantErr, statusErr); diff != "" { + t.Fatalf("Unexpected or missing errors (-want/+got): %s", diff) + } + return + } + currentCompletions = update.completions + } + + // Succeed specified indexes. + succeededSet := sets.NewInt() + if update.succeededIndexes != nil { + for _, idx := range update.succeededIndexes { + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, idx); err != nil { + t.Fatalf("Failed trying to succeed pod with index %d: %v", idx, err) + } + succeededSet.Insert(idx) + } + } + + // Fail specified indexes. + if update.failedIndexes != nil { + for _, idx := range update.failedIndexes { + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, idx); err != nil { + t.Fatalf("Failed trying to fail pod with index %d: %v", idx, err) + } + } + } + remainingIndexes := remainingIndexSet(currentCompletions, succeededSet) + newActive := len(remainingIndexes) + + // initialCompletions == initial parallelism, and active must be <= parallelism. + if newActive > currentCompletions && currentCompletions != noCompletionsUpdate { + newActive = currentCompletions + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: newActive, + Succeeded: len(succeededSet), + Failed: update.wantFailed, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, remainingIndexes, update.wantSucceededIndexes) + } + + validateJobSucceeded(ctx, t, clientSet, jobObj) + }) + } +} + // BenchmarkLargeIndexedJob benchmarks the completion of an Indexed Job. // We expect that large jobs are more commonly used as Indexed. And they are // also faster to track, as they need less API calls. @@ -1884,3 +2077,13 @@ func updateJob(ctx context.Context, jobClient typedv1.JobInterface, jobName stri }) return job, err } + +func remainingIndexSet(completions int, exclude sets.Int) sets.Int { + s := sets.NewInt() + for i := 0; i < completions; i++ { + if !exclude.Has(i) { + s.Insert(i) + } + } + return s +} From b0b0959b92f8680d86b3029f17d476aedfa5fd12 Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Wed, 15 Feb 2023 18:26:46 +0000 Subject: [PATCH 2/5] address comments --- test/integration/job/job_test.go | 61 +++++++++++++------------------- 1 file changed, 24 insertions(+), 37 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index ba6a8ce4515..d12bf4203a4 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1036,10 +1036,12 @@ func TestElasticIndexedJob(t *testing.T) { ) type jobUpdate struct { completions int - succeededIndexes []int - failedIndexes []int + succeedIndexes []int + failIndexes []int wantSucceededIndexes string wantFailed int + wantRemainingIndexes sets.Int + wantActivePods int } cases := map[string]struct { featureGate bool @@ -1064,7 +1066,7 @@ func TestElasticIndexedJob(t *testing.T) { { // Scale up completions 3->4 then succeed indexes 0-3 completions: 4, - succeededIndexes: []int{0, 1, 2, 3}, + succeedIndexes: []int{0, 1, 2, 3}, wantSucceededIndexes: "0-3", }, }, @@ -1075,16 +1077,18 @@ func TestElasticIndexedJob(t *testing.T) { // First succeed index 1 and fail index 2 while completions is still original value (3). { completions: noCompletionsUpdate, - succeededIndexes: []int{1}, - failedIndexes: []int{2}, + succeedIndexes: []int{1}, + failIndexes: []int{2}, wantSucceededIndexes: "1", wantFailed: 1, + wantRemainingIndexes: sets.NewInt(0, 2), + wantActivePods: 2, }, // Scale down completions 3->1, verify prev failure out of range still counts // but succeeded out of range does not. { completions: 1, - succeededIndexes: []int{0}, + succeedIndexes: []int{0}, wantSucceededIndexes: "0", wantFailed: 1, }, @@ -1096,17 +1100,21 @@ func TestElasticIndexedJob(t *testing.T) { // First succeed index 2 while completions is still original value (3). { completions: noCompletionsUpdate, - succeededIndexes: []int{2}, + succeedIndexes: []int{2}, wantSucceededIndexes: "2", + wantRemainingIndexes: sets.NewInt(0, 1), + wantActivePods: 2, }, // Scale completions down 3->2 to exclude previously succeeded index. { - completions: 2, + completions: 2, + wantRemainingIndexes: sets.NewInt(0, 1), + wantActivePods: 2, }, // Scale completions back up to include previously succeeded index that was temporarily out of range. { completions: 3, - succeededIndexes: []int{0, 1, 2}, + succeedIndexes: []int{0, 1, 2}, wantSucceededIndexes: "0-2", }, }, @@ -1158,8 +1166,6 @@ func TestElasticIndexedJob(t *testing.T) { t.Fatalf("Error waiting for Job pods to become active: %v", err) } - currentCompletions := initialCompletions - for _, update := range tc.jobUpdates { // Update Job spec if necessary. if update.completions != noCompletionsUpdate { @@ -1176,42 +1182,33 @@ func TestElasticIndexedJob(t *testing.T) { } return } - currentCompletions = update.completions } // Succeed specified indexes. - succeededSet := sets.NewInt() - if update.succeededIndexes != nil { - for _, idx := range update.succeededIndexes { + if update.succeedIndexes != nil { + for _, idx := range update.succeedIndexes { if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, idx); err != nil { t.Fatalf("Failed trying to succeed pod with index %d: %v", idx, err) } - succeededSet.Insert(idx) } } // Fail specified indexes. - if update.failedIndexes != nil { - for _, idx := range update.failedIndexes { + if update.failIndexes != nil { + for _, idx := range update.failIndexes { if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, idx); err != nil { t.Fatalf("Failed trying to fail pod with index %d: %v", idx, err) } } } - remainingIndexes := remainingIndexSet(currentCompletions, succeededSet) - newActive := len(remainingIndexes) - // initialCompletions == initial parallelism, and active must be <= parallelism. - if newActive > currentCompletions && currentCompletions != noCompletionsUpdate { - newActive = currentCompletions - } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: newActive, - Succeeded: len(succeededSet), + Active: update.wantActivePods, + Succeeded: len(update.succeedIndexes), Failed: update.wantFailed, Ready: pointer.Int32(0), }) - validateIndexedJobPods(ctx, t, clientSet, jobObj, remainingIndexes, update.wantSucceededIndexes) + validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes) } validateJobSucceeded(ctx, t, clientSet, jobObj) @@ -2077,13 +2074,3 @@ func updateJob(ctx context.Context, jobClient typedv1.JobInterface, jobName stri }) return job, err } - -func remainingIndexSet(completions int, exclude sets.Int) sets.Int { - s := sets.NewInt() - for i := 0; i < completions; i++ { - if !exclude.Has(i) { - s.Insert(i) - } - } - return s -} From c63f4484510832f3fe7b53e18ba48e25fdd3f8d9 Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Fri, 17 Feb 2023 23:02:05 +0000 Subject: [PATCH 3/5] change test names and address other comments --- pkg/apis/batch/validation/validation.go | 8 ++-- test/integration/job/job_test.go | 61 ++++++++++--------------- 2 files changed, 30 insertions(+), 39 deletions(-) diff --git a/pkg/apis/batch/validation/validation.go b/pkg/apis/batch/validation/validation.go index 0bac3f6fbb1..22a531031d3 100644 --- a/pkg/apis/batch/validation/validation.go +++ b/pkg/apis/batch/validation/validation.go @@ -568,13 +568,15 @@ func ValidateJobTemplateSpec(spec *batch.JobTemplateSpec, fldPath *field.Path, o } func validateCompletions(spec, oldSpec batch.JobSpec, fldPath *field.Path) field.ErrorList { - var allErrs field.ErrorList - + if !feature.DefaultFeatureGate.Enabled(features.ElasticIndexedJob) { + return apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath) + } // Completions is immutable for non-indexed jobs. // For Indexed Jobs, if ElasticIndexedJob feature gate is not enabled, // fall back to validating that spec.Completions is always immutable. + var allErrs field.ErrorList isIndexedJob := spec.CompletionMode != nil && *spec.CompletionMode == batch.IndexedCompletion - if !isIndexedJob || !feature.DefaultFeatureGate.Enabled(features.ElasticIndexedJob) { + if !isIndexedJob { return apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath) } diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index d12bf4203a4..0e118c83efc 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1030,12 +1030,9 @@ func TestIndexedJob(t *testing.T) { } func TestElasticIndexedJob(t *testing.T) { - const ( - noCompletionsUpdate = -1 - initialCompletions = 3 - ) + const initialCompletions int32 = 3 type jobUpdate struct { - completions int + completions *int32 succeedIndexes []int failIndexes []int wantSucceededIndexes string @@ -1051,7 +1048,7 @@ func TestElasticIndexedJob(t *testing.T) { "feature flag off, mutation not allowed": { jobUpdates: []jobUpdate{ { - completions: 4, + completions: pointer.Int32Ptr(4), }, }, wantErr: apierrors.NewInvalid( @@ -1060,23 +1057,22 @@ func TestElasticIndexedJob(t *testing.T) { field.ErrorList{field.Invalid(field.NewPath("spec", "completions"), 4, "field is immutable")}, ), }, - "scale up, verify that the range expands, and the job finishes successfully when indexes including the ones in the new range exit successfully": { + "scale up": { featureGate: true, jobUpdates: []jobUpdate{ { // Scale up completions 3->4 then succeed indexes 0-3 - completions: 4, + completions: pointer.Int32Ptr(4), succeedIndexes: []int{0, 1, 2, 3}, wantSucceededIndexes: "0-3", }, }, }, - "scale down, verify that the range shrinks, and the job finishes successfully when indexes only in the smaller range finishes successfully, and verify that failures that happened for indexes that are now outside the range still count": { + "scale down": { featureGate: true, jobUpdates: []jobUpdate{ // First succeed index 1 and fail index 2 while completions is still original value (3). { - completions: noCompletionsUpdate, succeedIndexes: []int{1}, failIndexes: []int{2}, wantSucceededIndexes: "1", @@ -1087,19 +1083,18 @@ func TestElasticIndexedJob(t *testing.T) { // Scale down completions 3->1, verify prev failure out of range still counts // but succeeded out of range does not. { - completions: 1, + completions: pointer.Int32Ptr(1), succeedIndexes: []int{0}, wantSucceededIndexes: "0", wantFailed: 1, }, }, }, - "index finishes successfully, scale down to exclude the index, then scale up to include it back. verify that the index was restarted and job finishes successfully after all indexes complete": { + "index finishes successfully, scale down, scale up": { featureGate: true, jobUpdates: []jobUpdate{ // First succeed index 2 while completions is still original value (3). { - completions: noCompletionsUpdate, succeedIndexes: []int{2}, wantSucceededIndexes: "2", wantRemainingIndexes: sets.NewInt(0, 1), @@ -1107,13 +1102,13 @@ func TestElasticIndexedJob(t *testing.T) { }, // Scale completions down 3->2 to exclude previously succeeded index. { - completions: 2, + completions: pointer.Int32Ptr(2), wantRemainingIndexes: sets.NewInt(0, 1), wantActivePods: 2, }, // Scale completions back up to include previously succeeded index that was temporarily out of range. { - completions: 3, + completions: pointer.Int32Ptr(3), succeedIndexes: []int{0, 1, 2}, wantSucceededIndexes: "0-2", }, @@ -1122,7 +1117,9 @@ func TestElasticIndexedJob(t *testing.T) { "scale down to 0, verify that the job succeeds": { featureGate: true, jobUpdates: []jobUpdate{ - {}, + { + completions: pointer.Int32Ptr(0), + }, }, }, } @@ -1141,8 +1138,8 @@ func TestElasticIndexedJob(t *testing.T) { mode := batchv1.IndexedCompletion jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ - Parallelism: pointer.Int32Ptr(int32(initialCompletions)), - Completions: pointer.Int32Ptr(int32(initialCompletions)), + Parallelism: pointer.Int32Ptr(initialCompletions), + Completions: pointer.Int32Ptr(initialCompletions), CompletionMode: &mode, }, }) @@ -1168,16 +1165,12 @@ func TestElasticIndexedJob(t *testing.T) { for _, update := range tc.jobUpdates { // Update Job spec if necessary. - if update.completions != noCompletionsUpdate { + if update.completions != nil { if jobObj, err = updateJob(ctx, jobClient, jobObj.Name, func(j *batchv1.Job) { - j.Spec.Completions = pointer.Int32Ptr(int32(update.completions)) - j.Spec.Parallelism = pointer.Int32Ptr(int32(update.completions)) + j.Spec.Completions = update.completions + j.Spec.Parallelism = update.completions }); err != nil { - if tc.wantErr == nil { - t.Fatalf("Failed to update Job: %v", err) - } - statusErr := err.(*apierrors.StatusError) - if diff := cmp.Diff(tc.wantErr, statusErr); diff != "" { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { t.Fatalf("Unexpected or missing errors (-want/+got): %s", diff) } return @@ -1185,20 +1178,16 @@ func TestElasticIndexedJob(t *testing.T) { } // Succeed specified indexes. - if update.succeedIndexes != nil { - for _, idx := range update.succeedIndexes { - if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, idx); err != nil { - t.Fatalf("Failed trying to succeed pod with index %d: %v", idx, err) - } + for _, idx := range update.succeedIndexes { + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, idx); err != nil { + t.Fatalf("Failed trying to succeed pod with index %d: %v", idx, err) } } // Fail specified indexes. - if update.failIndexes != nil { - for _, idx := range update.failIndexes { - if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, idx); err != nil { - t.Fatalf("Failed trying to fail pod with index %d: %v", idx, err) - } + for _, idx := range update.failIndexes { + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, idx); err != nil { + t.Fatalf("Failed trying to fail pod with index %d: %v", idx, err) } } From 15077a0f28ffc00576ae95af4a5e17e5917664cd Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Thu, 23 Feb 2023 03:20:52 +0000 Subject: [PATCH 4/5] plumb feature gate value through job validation opts and modify validateCompletions function to only check completions == parallelism after the update, not before --- pkg/apis/batch/validation/validation.go | 27 ++++++++++---------- pkg/apis/batch/validation/validation_test.go | 20 ++++++++++++--- pkg/registry/batch/job/strategy.go | 2 ++ 3 files changed, 33 insertions(+), 16 deletions(-) diff --git a/pkg/apis/batch/validation/validation.go b/pkg/apis/batch/validation/validation.go index 22a531031d3..840b8408420 100644 --- a/pkg/apis/batch/validation/validation.go +++ b/pkg/apis/batch/validation/validation.go @@ -25,17 +25,16 @@ import ( "github.com/robfig/cron/v3" v1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" unversionedvalidation "k8s.io/apimachinery/pkg/apis/meta/v1/validation" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" apimachineryvalidation "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation/field" - "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" apivalidation "k8s.io/kubernetes/pkg/apis/core/validation" - "k8s.io/kubernetes/pkg/features" "k8s.io/utils/pointer" ) @@ -377,7 +376,7 @@ func ValidateJobUpdateStatus(job, oldJob *batch.Job) field.ErrorList { func ValidateJobSpecUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts JobValidationOptions) field.ErrorList { allErrs := field.ErrorList{} allErrs = append(allErrs, ValidateJobSpec(&spec, fldPath, opts.PodValidationOptions)...) - allErrs = append(allErrs, validateCompletions(spec, oldSpec, fldPath.Child("completions"))...) + allErrs = append(allErrs, validateCompletions(spec, oldSpec, fldPath.Child("completions"), opts)...) allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Selector, oldSpec.Selector, fldPath.Child("selector"))...) allErrs = append(allErrs, validatePodTemplateUpdate(spec, oldSpec, fldPath, opts)...) allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.CompletionMode, oldSpec.CompletionMode, fldPath.Child("completionMode"))...) @@ -567,31 +566,31 @@ func ValidateJobTemplateSpec(spec *batch.JobTemplateSpec, fldPath *field.Path, o return allErrs } -func validateCompletions(spec, oldSpec batch.JobSpec, fldPath *field.Path) field.ErrorList { - if !feature.DefaultFeatureGate.Enabled(features.ElasticIndexedJob) { +func validateCompletions(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts JobValidationOptions) field.ErrorList { + if !opts.AllowElasticIndexedJobs { return apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath) } + // Completions is immutable for non-indexed jobs. // For Indexed Jobs, if ElasticIndexedJob feature gate is not enabled, // fall back to validating that spec.Completions is always immutable. - var allErrs field.ErrorList isIndexedJob := spec.CompletionMode != nil && *spec.CompletionMode == batch.IndexedCompletion if !isIndexedJob { return apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath) } + var allErrs field.ErrorList + if apiequality.Semantic.DeepEqual(spec.Completions, oldSpec.Completions) { + return allErrs + } // Indexed Jobs cannot set completions to nil. The nil check // is already performed in validateJobSpec, no need to add another error. - if spec.Completions == nil || oldSpec.Completions == nil { + if spec.Completions == nil { return allErrs } - // For Indexed Jobs, spec.Completions can only be mutated in tandem - // with spec.Parallelism such that spec.Completions == spec.Parallelism - if *spec.Completions != *oldSpec.Completions { - if *spec.Completions != *spec.Parallelism || *oldSpec.Completions != *oldSpec.Parallelism { - allErrs = append(allErrs, field.Invalid(fldPath, spec.Completions, fmt.Sprintf("can only be modified in tandem with %s", fldPath.Root().Child("parallelism").String()))) - } + if *spec.Completions != *spec.Parallelism { + allErrs = append(allErrs, field.Invalid(fldPath, spec.Completions, fmt.Sprintf("can only be modified in tandem with %s", fldPath.Root().Child("parallelism").String()))) } return allErrs } @@ -602,4 +601,6 @@ type JobValidationOptions struct { AllowTrackingAnnotation bool // Allow mutable node affinity, selector and tolerations of the template AllowMutableSchedulingDirectives bool + // Allow elastic indexed jobs + AllowElasticIndexedJobs bool } diff --git a/pkg/apis/batch/validation/validation_test.go b/pkg/apis/batch/validation/validation_test.go index 7055c245978..4d1c9d97209 100644 --- a/pkg/apis/batch/validation/validation_test.go +++ b/pkg/apis/batch/validation/validation_test.go @@ -1298,6 +1298,9 @@ func TestValidateJobUpdate(t *testing.T) { job.Spec.Completions = pointer.Int32Ptr(2) job.Spec.Parallelism = pointer.Int32Ptr(2) }, + opts: JobValidationOptions{ + AllowElasticIndexedJobs: true, + }, }, "previous parallelism != previous completions, new parallelism == new completions": { old: batch.Job{ @@ -1314,9 +1317,8 @@ func TestValidateJobUpdate(t *testing.T) { job.Spec.Completions = pointer.Int32Ptr(3) job.Spec.Parallelism = pointer.Int32Ptr(3) }, - err: &field.Error{ - Type: field.ErrorTypeInvalid, - Field: "spec.completions", + opts: JobValidationOptions{ + AllowElasticIndexedJobs: true, }, }, "indexed job updating completions and parallelism to different values is invalid": { @@ -1334,6 +1336,9 @@ func TestValidateJobUpdate(t *testing.T) { job.Spec.Completions = pointer.Int32Ptr(2) job.Spec.Parallelism = pointer.Int32Ptr(3) }, + opts: JobValidationOptions{ + AllowElasticIndexedJobs: true, + }, err: &field.Error{ Type: field.ErrorTypeInvalid, Field: "spec.completions", @@ -1354,6 +1359,9 @@ func TestValidateJobUpdate(t *testing.T) { job.Spec.Completions = nil job.Spec.Parallelism = pointer.Int32Ptr(3) }, + opts: JobValidationOptions{ + AllowElasticIndexedJobs: true, + }, err: &field.Error{ Type: field.ErrorTypeRequired, Field: "spec.completions", @@ -1374,6 +1382,9 @@ func TestValidateJobUpdate(t *testing.T) { job.Spec.Completions = pointer.Int32Ptr(2) job.Spec.Parallelism = pointer.Int32Ptr(1) }, + opts: JobValidationOptions{ + AllowElasticIndexedJobs: true, + }, }, "indexed job with completions unchanged, parallelism increased higher than completions": { old: batch.Job{ @@ -1390,6 +1401,9 @@ func TestValidateJobUpdate(t *testing.T) { job.Spec.Completions = pointer.Int32Ptr(2) job.Spec.Parallelism = pointer.Int32Ptr(3) }, + opts: JobValidationOptions{ + AllowElasticIndexedJobs: true, + }, }, } ignoreValueAndDetail := cmpopts.IgnoreFields(field.Error{}, "BadValue", "Detail") diff --git a/pkg/registry/batch/job/strategy.go b/pkg/registry/batch/job/strategy.go index 6f9789ad488..e0b3d8104fd 100644 --- a/pkg/registry/batch/job/strategy.go +++ b/pkg/registry/batch/job/strategy.go @@ -185,6 +185,8 @@ func validationOptionsForJob(newJob, oldJob *batch.Job) batchvalidation.JobValid opts.AllowMutableSchedulingDirectives = utilfeature.DefaultFeatureGate.Enabled(features.JobMutableNodeSchedulingDirectives) && suspended && notStarted + // Elastic indexed jobs (mutable completions iff updated parallelism == updated completions) + opts.AllowElasticIndexedJobs = utilfeature.DefaultFeatureGate.Enabled(features.ElasticIndexedJob) } return opts From 8d31da4599f67e94498217fe43724acd6c8c0ef2 Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Thu, 23 Feb 2023 21:05:03 +0000 Subject: [PATCH 5/5] add validation test case for immutable completions on indexed jobs when AllowElasticIndexedJobs is false --- pkg/apis/batch/validation/validation_test.go | 16 ++++++++++++++++ pkg/registry/batch/job/strategy.go | 5 ++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/pkg/apis/batch/validation/validation_test.go b/pkg/apis/batch/validation/validation_test.go index 4d1c9d97209..9fe06fada50 100644 --- a/pkg/apis/batch/validation/validation_test.go +++ b/pkg/apis/batch/validation/validation_test.go @@ -935,6 +935,22 @@ func TestValidateJobUpdate(t *testing.T) { Field: "spec.completions", }, }, + "immutable completions for indexed job when AllowElasticIndexedJobs is false": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(1) + }, + err: &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "spec.completions", + }, + }, "immutable selector": { old: batch.Job{ ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, diff --git a/pkg/registry/batch/job/strategy.go b/pkg/registry/batch/job/strategy.go index e0b3d8104fd..4d485a06428 100644 --- a/pkg/registry/batch/job/strategy.go +++ b/pkg/registry/batch/job/strategy.go @@ -184,11 +184,10 @@ func validationOptionsForJob(newJob, oldJob *batch.Job) batchvalidation.JobValid notStarted := oldJob.Status.StartTime == nil opts.AllowMutableSchedulingDirectives = utilfeature.DefaultFeatureGate.Enabled(features.JobMutableNodeSchedulingDirectives) && suspended && notStarted - - // Elastic indexed jobs (mutable completions iff updated parallelism == updated completions) - opts.AllowElasticIndexedJobs = utilfeature.DefaultFeatureGate.Enabled(features.ElasticIndexedJob) } + // Elastic indexed jobs (mutable completions iff updated parallelism == updated completions) + opts.AllowElasticIndexedJobs = utilfeature.DefaultFeatureGate.Enabled(features.ElasticIndexedJob) return opts }