diff --git a/pkg/apis/batch/validation/validation.go b/pkg/apis/batch/validation/validation.go index 26c365b2f98..840b8408420 100644 --- a/pkg/apis/batch/validation/validation.go +++ b/pkg/apis/batch/validation/validation.go @@ -25,6 +25,7 @@ import ( "github.com/robfig/cron/v3" v1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" unversionedvalidation "k8s.io/apimachinery/pkg/apis/meta/v1/validation" "k8s.io/apimachinery/pkg/labels" @@ -375,7 +376,7 @@ func ValidateJobUpdateStatus(job, oldJob *batch.Job) field.ErrorList { func ValidateJobSpecUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts JobValidationOptions) field.ErrorList { allErrs := field.ErrorList{} allErrs = append(allErrs, ValidateJobSpec(&spec, fldPath, opts.PodValidationOptions)...) - allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath.Child("completions"))...) + allErrs = append(allErrs, validateCompletions(spec, oldSpec, fldPath.Child("completions"), opts)...) allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Selector, oldSpec.Selector, fldPath.Child("selector"))...) allErrs = append(allErrs, validatePodTemplateUpdate(spec, oldSpec, fldPath, opts)...) allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.CompletionMode, oldSpec.CompletionMode, fldPath.Child("completionMode"))...) @@ -565,10 +566,41 @@ func ValidateJobTemplateSpec(spec *batch.JobTemplateSpec, fldPath *field.Path, o return allErrs } +func validateCompletions(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts JobValidationOptions) field.ErrorList { + if !opts.AllowElasticIndexedJobs { + return apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath) + } + + // Completions is immutable for non-indexed jobs. + // For Indexed Jobs, if ElasticIndexedJob feature gate is not enabled, + // fall back to validating that spec.Completions is always immutable. + isIndexedJob := spec.CompletionMode != nil && *spec.CompletionMode == batch.IndexedCompletion + if !isIndexedJob { + return apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath) + } + + var allErrs field.ErrorList + if apiequality.Semantic.DeepEqual(spec.Completions, oldSpec.Completions) { + return allErrs + } + // Indexed Jobs cannot set completions to nil. The nil check + // is already performed in validateJobSpec, no need to add another error. + if spec.Completions == nil { + return allErrs + } + + if *spec.Completions != *spec.Parallelism { + allErrs = append(allErrs, field.Invalid(fldPath, spec.Completions, fmt.Sprintf("can only be modified in tandem with %s", fldPath.Root().Child("parallelism").String()))) + } + return allErrs +} + type JobValidationOptions struct { apivalidation.PodValidationOptions // Allow Job to have the annotation batch.kubernetes.io/job-tracking AllowTrackingAnnotation bool // Allow mutable node affinity, selector and tolerations of the template AllowMutableSchedulingDirectives bool + // Allow elastic indexed jobs + AllowElasticIndexedJobs bool } diff --git a/pkg/apis/batch/validation/validation_test.go b/pkg/apis/batch/validation/validation_test.go index b15f3bdd7b5..9fe06fada50 100644 --- a/pkg/apis/batch/validation/validation_test.go +++ b/pkg/apis/batch/validation/validation_test.go @@ -919,7 +919,23 @@ func TestValidateJobUpdate(t *testing.T) { job.Spec.ManualSelector = pointer.BoolPtr(true) }, }, - "immutable completion": { + "immutable completions for non-indexed jobs": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(1) + }, + err: &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "spec.completions", + }, + }, + "immutable completions for indexed job when AllowElasticIndexedJobs is false": { old: batch.Job{ ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, Spec: batch.JobSpec{ @@ -1283,6 +1299,128 @@ func TestValidateJobUpdate(t *testing.T) { AllowMutableSchedulingDirectives: true, }, }, + "update completions and parallelism to same value is valid": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(1), + Parallelism: pointer.Int32Ptr(1), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(2) + job.Spec.Parallelism = pointer.Int32Ptr(2) + }, + opts: JobValidationOptions{ + AllowElasticIndexedJobs: true, + }, + }, + "previous parallelism != previous completions, new parallelism == new completions": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(1), + Parallelism: pointer.Int32Ptr(2), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(3) + job.Spec.Parallelism = pointer.Int32Ptr(3) + }, + opts: JobValidationOptions{ + AllowElasticIndexedJobs: true, + }, + }, + "indexed job updating completions and parallelism to different values is invalid": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(1), + Parallelism: pointer.Int32Ptr(1), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(2) + job.Spec.Parallelism = pointer.Int32Ptr(3) + }, + opts: JobValidationOptions{ + AllowElasticIndexedJobs: true, + }, + err: &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "spec.completions", + }, + }, + "indexed job with completions set updated to nil does not panic": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(1), + Parallelism: pointer.Int32Ptr(1), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = nil + job.Spec.Parallelism = pointer.Int32Ptr(3) + }, + opts: JobValidationOptions{ + AllowElasticIndexedJobs: true, + }, + err: &field.Error{ + Type: field.ErrorTypeRequired, + Field: "spec.completions", + }, + }, + "indexed job with completions unchanged, parallelism reduced to less than completions": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(2), + Parallelism: pointer.Int32Ptr(2), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(2) + job.Spec.Parallelism = pointer.Int32Ptr(1) + }, + opts: JobValidationOptions{ + AllowElasticIndexedJobs: true, + }, + }, + "indexed job with completions unchanged, parallelism increased higher than completions": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(2), + Parallelism: pointer.Int32Ptr(2), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(2) + job.Spec.Parallelism = pointer.Int32Ptr(3) + }, + opts: JobValidationOptions{ + AllowElasticIndexedJobs: true, + }, + }, } ignoreValueAndDetail := cmpopts.IgnoreFields(field.Error{}, "BadValue", "Detail") for k, tc := range cases { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 9f5b3d7b1f2..6cfadb3269c 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -237,8 +237,7 @@ func TestControllerSyncJob(t *testing.T) { expectedConditionStatus v1.ConditionStatus expectedConditionReason string expectedCreatedIndexes sets.Int - - expectedPodPatches int + expectedPodPatches int // features jobReadyPodsEnabled bool diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 2c088b90faf..e27e12a740e 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -685,6 +685,15 @@ const ( // certificate as expiration approaches. RotateKubeletServerCertificate featuregate.Feature = "RotateKubeletServerCertificate" + // owner: @danielvegamyhre + // kep: https://kep.k8s.io/2413 + // beta: v1.27 + // + // Allows mutating spec.completions for Indexed job when done in tandem with + // spec.parallelism. Specifically, spec.completions is mutable iff spec.completions + // equals to spec.parallelism before and after the update. + ElasticIndexedJob featuregate.Feature = "ElasticIndexedJob" + // owner: @saschagrunert // kep: https://kep.k8s.io/2413 // alpha: v1.22 @@ -1023,6 +1032,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS RotateKubeletServerCertificate: {Default: true, PreRelease: featuregate.Beta}, + ElasticIndexedJob: {Default: true, PreRelease: featuregate.Beta}, + SeccompDefault: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 ServiceIPStaticSubrange: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28 diff --git a/pkg/registry/batch/job/strategy.go b/pkg/registry/batch/job/strategy.go index 6f9789ad488..4d485a06428 100644 --- a/pkg/registry/batch/job/strategy.go +++ b/pkg/registry/batch/job/strategy.go @@ -184,9 +184,10 @@ func validationOptionsForJob(newJob, oldJob *batch.Job) batchvalidation.JobValid notStarted := oldJob.Status.StartTime == nil opts.AllowMutableSchedulingDirectives = utilfeature.DefaultFeatureGate.Enabled(features.JobMutableNodeSchedulingDirectives) && suspended && notStarted - } + // Elastic indexed jobs (mutable completions iff updated parallelism == updated completions) + opts.AllowElasticIndexedJobs = utilfeature.DefaultFeatureGate.Enabled(features.ElasticIndexedJob) return opts } diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 7d6fcfd8e1d..0e118c83efc 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -31,9 +31,12 @@ import ( batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" eventsv1 "k8s.io/api/events/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/util/feature" @@ -1026,6 +1029,182 @@ func TestIndexedJob(t *testing.T) { validateTerminatedPodsTrackingFinalizerMetric(t, 5) } +func TestElasticIndexedJob(t *testing.T) { + const initialCompletions int32 = 3 + type jobUpdate struct { + completions *int32 + succeedIndexes []int + failIndexes []int + wantSucceededIndexes string + wantFailed int + wantRemainingIndexes sets.Int + wantActivePods int + } + cases := map[string]struct { + featureGate bool + jobUpdates []jobUpdate + wantErr *apierrors.StatusError + }{ + "feature flag off, mutation not allowed": { + jobUpdates: []jobUpdate{ + { + completions: pointer.Int32Ptr(4), + }, + }, + wantErr: apierrors.NewInvalid( + schema.GroupKind{Group: "batch", Kind: "Job"}, + "test-job", + field.ErrorList{field.Invalid(field.NewPath("spec", "completions"), 4, "field is immutable")}, + ), + }, + "scale up": { + featureGate: true, + jobUpdates: []jobUpdate{ + { + // Scale up completions 3->4 then succeed indexes 0-3 + completions: pointer.Int32Ptr(4), + succeedIndexes: []int{0, 1, 2, 3}, + wantSucceededIndexes: "0-3", + }, + }, + }, + "scale down": { + featureGate: true, + jobUpdates: []jobUpdate{ + // First succeed index 1 and fail index 2 while completions is still original value (3). + { + succeedIndexes: []int{1}, + failIndexes: []int{2}, + wantSucceededIndexes: "1", + wantFailed: 1, + wantRemainingIndexes: sets.NewInt(0, 2), + wantActivePods: 2, + }, + // Scale down completions 3->1, verify prev failure out of range still counts + // but succeeded out of range does not. + { + completions: pointer.Int32Ptr(1), + succeedIndexes: []int{0}, + wantSucceededIndexes: "0", + wantFailed: 1, + }, + }, + }, + "index finishes successfully, scale down, scale up": { + featureGate: true, + jobUpdates: []jobUpdate{ + // First succeed index 2 while completions is still original value (3). + { + succeedIndexes: []int{2}, + wantSucceededIndexes: "2", + wantRemainingIndexes: sets.NewInt(0, 1), + wantActivePods: 2, + }, + // Scale completions down 3->2 to exclude previously succeeded index. + { + completions: pointer.Int32Ptr(2), + wantRemainingIndexes: sets.NewInt(0, 1), + wantActivePods: 2, + }, + // Scale completions back up to include previously succeeded index that was temporarily out of range. + { + completions: pointer.Int32Ptr(3), + succeedIndexes: []int{0, 1, 2}, + wantSucceededIndexes: "0-2", + }, + }, + }, + "scale down to 0, verify that the job succeeds": { + featureGate: true, + jobUpdates: []jobUpdate{ + { + completions: pointer.Int32Ptr(0), + }, + }, + }, + } + + for name, tc := range cases { + tc := tc + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, tc.featureGate)() + closeFn, restConfig, clientSet, ns := setup(t, "indexed") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + defer cancel() + resetMetrics() + + // Set up initial Job in Indexed completion mode. + mode := batchv1.IndexedCompletion + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(initialCompletions), + Completions: pointer.Int32Ptr(initialCompletions), + CompletionMode: &mode, + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace) + + // Wait for pods to start up. + err = wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if job.Status.Active == int32(initialCompletions) { + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("Error waiting for Job pods to become active: %v", err) + } + + for _, update := range tc.jobUpdates { + // Update Job spec if necessary. + if update.completions != nil { + if jobObj, err = updateJob(ctx, jobClient, jobObj.Name, func(j *batchv1.Job) { + j.Spec.Completions = update.completions + j.Spec.Parallelism = update.completions + }); err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Fatalf("Unexpected or missing errors (-want/+got): %s", diff) + } + return + } + } + + // Succeed specified indexes. + for _, idx := range update.succeedIndexes { + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, idx); err != nil { + t.Fatalf("Failed trying to succeed pod with index %d: %v", idx, err) + } + } + + // Fail specified indexes. + for _, idx := range update.failIndexes { + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, idx); err != nil { + t.Fatalf("Failed trying to fail pod with index %d: %v", idx, err) + } + } + + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: update.wantActivePods, + Succeeded: len(update.succeedIndexes), + Failed: update.wantFailed, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes) + } + + validateJobSucceeded(ctx, t, clientSet, jobObj) + }) + } +} + // BenchmarkLargeIndexedJob benchmarks the completion of an Indexed Job. // We expect that large jobs are more commonly used as Indexed. And they are // also faster to track, as they need less API calls.