From d41302312ef0b938e7a8c97021ee47e64bc9b173 Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Fri, 20 Jan 2023 23:55:54 +0000 Subject: [PATCH] update validation logic so completions is mutable iff completions is modified in tandem with parallelsim so completions == parallelism --- pkg/apis/batch/validation/validation.go | 31 ++- pkg/apis/batch/validation/validation_test.go | 110 +++++++++- pkg/controller/job/job_controller_test.go | 3 +- pkg/features/kube_features.go | 11 + test/integration/job/job_test.go | 203 +++++++++++++++++++ 5 files changed, 354 insertions(+), 4 deletions(-) diff --git a/pkg/apis/batch/validation/validation.go b/pkg/apis/batch/validation/validation.go index 26c365b2f98..0bac3f6fbb1 100644 --- a/pkg/apis/batch/validation/validation.go +++ b/pkg/apis/batch/validation/validation.go @@ -31,9 +31,11 @@ import ( "k8s.io/apimachinery/pkg/util/sets" apimachineryvalidation "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" apivalidation "k8s.io/kubernetes/pkg/apis/core/validation" + "k8s.io/kubernetes/pkg/features" "k8s.io/utils/pointer" ) @@ -375,7 +377,7 @@ func ValidateJobUpdateStatus(job, oldJob *batch.Job) field.ErrorList { func ValidateJobSpecUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts JobValidationOptions) field.ErrorList { allErrs := field.ErrorList{} allErrs = append(allErrs, ValidateJobSpec(&spec, fldPath, opts.PodValidationOptions)...) - allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath.Child("completions"))...) + allErrs = append(allErrs, validateCompletions(spec, oldSpec, fldPath.Child("completions"))...) allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Selector, oldSpec.Selector, fldPath.Child("selector"))...) allErrs = append(allErrs, validatePodTemplateUpdate(spec, oldSpec, fldPath, opts)...) allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.CompletionMode, oldSpec.CompletionMode, fldPath.Child("completionMode"))...) @@ -565,6 +567,33 @@ func ValidateJobTemplateSpec(spec *batch.JobTemplateSpec, fldPath *field.Path, o return allErrs } +func validateCompletions(spec, oldSpec batch.JobSpec, fldPath *field.Path) field.ErrorList { + var allErrs field.ErrorList + + // Completions is immutable for non-indexed jobs. + // For Indexed Jobs, if ElasticIndexedJob feature gate is not enabled, + // fall back to validating that spec.Completions is always immutable. + isIndexedJob := spec.CompletionMode != nil && *spec.CompletionMode == batch.IndexedCompletion + if !isIndexedJob || !feature.DefaultFeatureGate.Enabled(features.ElasticIndexedJob) { + return apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath) + } + + // Indexed Jobs cannot set completions to nil. The nil check + // is already performed in validateJobSpec, no need to add another error. + if spec.Completions == nil || oldSpec.Completions == nil { + return allErrs + } + + // For Indexed Jobs, spec.Completions can only be mutated in tandem + // with spec.Parallelism such that spec.Completions == spec.Parallelism + if *spec.Completions != *oldSpec.Completions { + if *spec.Completions != *spec.Parallelism || *oldSpec.Completions != *oldSpec.Parallelism { + allErrs = append(allErrs, field.Invalid(fldPath, spec.Completions, fmt.Sprintf("can only be modified in tandem with %s", fldPath.Root().Child("parallelism").String()))) + } + } + return allErrs +} + type JobValidationOptions struct { apivalidation.PodValidationOptions // Allow Job to have the annotation batch.kubernetes.io/job-tracking diff --git a/pkg/apis/batch/validation/validation_test.go b/pkg/apis/batch/validation/validation_test.go index b15f3bdd7b5..7055c245978 100644 --- a/pkg/apis/batch/validation/validation_test.go +++ b/pkg/apis/batch/validation/validation_test.go @@ -919,7 +919,7 @@ func TestValidateJobUpdate(t *testing.T) { job.Spec.ManualSelector = pointer.BoolPtr(true) }, }, - "immutable completion": { + "immutable completions for non-indexed jobs": { old: batch.Job{ ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, Spec: batch.JobSpec{ @@ -1283,6 +1283,114 @@ func TestValidateJobUpdate(t *testing.T) { AllowMutableSchedulingDirectives: true, }, }, + "update completions and parallelism to same value is valid": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(1), + Parallelism: pointer.Int32Ptr(1), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(2) + job.Spec.Parallelism = pointer.Int32Ptr(2) + }, + }, + "previous parallelism != previous completions, new parallelism == new completions": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(1), + Parallelism: pointer.Int32Ptr(2), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(3) + job.Spec.Parallelism = pointer.Int32Ptr(3) + }, + err: &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "spec.completions", + }, + }, + "indexed job updating completions and parallelism to different values is invalid": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(1), + Parallelism: pointer.Int32Ptr(1), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(2) + job.Spec.Parallelism = pointer.Int32Ptr(3) + }, + err: &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "spec.completions", + }, + }, + "indexed job with completions set updated to nil does not panic": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(1), + Parallelism: pointer.Int32Ptr(1), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = nil + job.Spec.Parallelism = pointer.Int32Ptr(3) + }, + err: &field.Error{ + Type: field.ErrorTypeRequired, + Field: "spec.completions", + }, + }, + "indexed job with completions unchanged, parallelism reduced to less than completions": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(2), + Parallelism: pointer.Int32Ptr(2), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(2) + job.Spec.Parallelism = pointer.Int32Ptr(1) + }, + }, + "indexed job with completions unchanged, parallelism increased higher than completions": { + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + Completions: pointer.Int32Ptr(2), + Parallelism: pointer.Int32Ptr(2), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: func(job *batch.Job) { + job.Spec.Completions = pointer.Int32Ptr(2) + job.Spec.Parallelism = pointer.Int32Ptr(3) + }, + }, } ignoreValueAndDetail := cmpopts.IgnoreFields(field.Error{}, "BadValue", "Detail") for k, tc := range cases { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 9f5b3d7b1f2..6cfadb3269c 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -237,8 +237,7 @@ func TestControllerSyncJob(t *testing.T) { expectedConditionStatus v1.ConditionStatus expectedConditionReason string expectedCreatedIndexes sets.Int - - expectedPodPatches int + expectedPodPatches int // features jobReadyPodsEnabled bool diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 2c088b90faf..e27e12a740e 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -685,6 +685,15 @@ const ( // certificate as expiration approaches. RotateKubeletServerCertificate featuregate.Feature = "RotateKubeletServerCertificate" + // owner: @danielvegamyhre + // kep: https://kep.k8s.io/2413 + // beta: v1.27 + // + // Allows mutating spec.completions for Indexed job when done in tandem with + // spec.parallelism. Specifically, spec.completions is mutable iff spec.completions + // equals to spec.parallelism before and after the update. + ElasticIndexedJob featuregate.Feature = "ElasticIndexedJob" + // owner: @saschagrunert // kep: https://kep.k8s.io/2413 // alpha: v1.22 @@ -1023,6 +1032,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS RotateKubeletServerCertificate: {Default: true, PreRelease: featuregate.Beta}, + ElasticIndexedJob: {Default: true, PreRelease: featuregate.Beta}, + SeccompDefault: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29 ServiceIPStaticSubrange: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28 diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 7d6fcfd8e1d..ba6a8ce4515 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -31,9 +31,12 @@ import ( batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" eventsv1 "k8s.io/api/events/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/util/feature" @@ -1026,6 +1029,196 @@ func TestIndexedJob(t *testing.T) { validateTerminatedPodsTrackingFinalizerMetric(t, 5) } +func TestElasticIndexedJob(t *testing.T) { + const ( + noCompletionsUpdate = -1 + initialCompletions = 3 + ) + type jobUpdate struct { + completions int + succeededIndexes []int + failedIndexes []int + wantSucceededIndexes string + wantFailed int + } + cases := map[string]struct { + featureGate bool + jobUpdates []jobUpdate + wantErr *apierrors.StatusError + }{ + "feature flag off, mutation not allowed": { + jobUpdates: []jobUpdate{ + { + completions: 4, + }, + }, + wantErr: apierrors.NewInvalid( + schema.GroupKind{Group: "batch", Kind: "Job"}, + "test-job", + field.ErrorList{field.Invalid(field.NewPath("spec", "completions"), 4, "field is immutable")}, + ), + }, + "scale up, verify that the range expands, and the job finishes successfully when indexes including the ones in the new range exit successfully": { + featureGate: true, + jobUpdates: []jobUpdate{ + { + // Scale up completions 3->4 then succeed indexes 0-3 + completions: 4, + succeededIndexes: []int{0, 1, 2, 3}, + wantSucceededIndexes: "0-3", + }, + }, + }, + "scale down, verify that the range shrinks, and the job finishes successfully when indexes only in the smaller range finishes successfully, and verify that failures that happened for indexes that are now outside the range still count": { + featureGate: true, + jobUpdates: []jobUpdate{ + // First succeed index 1 and fail index 2 while completions is still original value (3). + { + completions: noCompletionsUpdate, + succeededIndexes: []int{1}, + failedIndexes: []int{2}, + wantSucceededIndexes: "1", + wantFailed: 1, + }, + // Scale down completions 3->1, verify prev failure out of range still counts + // but succeeded out of range does not. + { + completions: 1, + succeededIndexes: []int{0}, + wantSucceededIndexes: "0", + wantFailed: 1, + }, + }, + }, + "index finishes successfully, scale down to exclude the index, then scale up to include it back. verify that the index was restarted and job finishes successfully after all indexes complete": { + featureGate: true, + jobUpdates: []jobUpdate{ + // First succeed index 2 while completions is still original value (3). + { + completions: noCompletionsUpdate, + succeededIndexes: []int{2}, + wantSucceededIndexes: "2", + }, + // Scale completions down 3->2 to exclude previously succeeded index. + { + completions: 2, + }, + // Scale completions back up to include previously succeeded index that was temporarily out of range. + { + completions: 3, + succeededIndexes: []int{0, 1, 2}, + wantSucceededIndexes: "0-2", + }, + }, + }, + "scale down to 0, verify that the job succeeds": { + featureGate: true, + jobUpdates: []jobUpdate{ + {}, + }, + }, + } + + for name, tc := range cases { + tc := tc + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, tc.featureGate)() + closeFn, restConfig, clientSet, ns := setup(t, "indexed") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + defer cancel() + resetMetrics() + + // Set up initial Job in Indexed completion mode. + mode := batchv1.IndexedCompletion + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(int32(initialCompletions)), + Completions: pointer.Int32Ptr(int32(initialCompletions)), + CompletionMode: &mode, + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace) + + // Wait for pods to start up. + err = wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if job.Status.Active == int32(initialCompletions) { + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("Error waiting for Job pods to become active: %v", err) + } + + currentCompletions := initialCompletions + + for _, update := range tc.jobUpdates { + // Update Job spec if necessary. + if update.completions != noCompletionsUpdate { + if jobObj, err = updateJob(ctx, jobClient, jobObj.Name, func(j *batchv1.Job) { + j.Spec.Completions = pointer.Int32Ptr(int32(update.completions)) + j.Spec.Parallelism = pointer.Int32Ptr(int32(update.completions)) + }); err != nil { + if tc.wantErr == nil { + t.Fatalf("Failed to update Job: %v", err) + } + statusErr := err.(*apierrors.StatusError) + if diff := cmp.Diff(tc.wantErr, statusErr); diff != "" { + t.Fatalf("Unexpected or missing errors (-want/+got): %s", diff) + } + return + } + currentCompletions = update.completions + } + + // Succeed specified indexes. + succeededSet := sets.NewInt() + if update.succeededIndexes != nil { + for _, idx := range update.succeededIndexes { + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, idx); err != nil { + t.Fatalf("Failed trying to succeed pod with index %d: %v", idx, err) + } + succeededSet.Insert(idx) + } + } + + // Fail specified indexes. + if update.failedIndexes != nil { + for _, idx := range update.failedIndexes { + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, idx); err != nil { + t.Fatalf("Failed trying to fail pod with index %d: %v", idx, err) + } + } + } + remainingIndexes := remainingIndexSet(currentCompletions, succeededSet) + newActive := len(remainingIndexes) + + // initialCompletions == initial parallelism, and active must be <= parallelism. + if newActive > currentCompletions && currentCompletions != noCompletionsUpdate { + newActive = currentCompletions + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: newActive, + Succeeded: len(succeededSet), + Failed: update.wantFailed, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, remainingIndexes, update.wantSucceededIndexes) + } + + validateJobSucceeded(ctx, t, clientSet, jobObj) + }) + } +} + // BenchmarkLargeIndexedJob benchmarks the completion of an Indexed Job. // We expect that large jobs are more commonly used as Indexed. And they are // also faster to track, as they need less API calls. @@ -1884,3 +2077,13 @@ func updateJob(ctx context.Context, jobClient typedv1.JobInterface, jobName stri }) return job, err } + +func remainingIndexSet(completions int, exclude sets.Int) sets.Int { + s := sets.NewInt() + for i := 0; i < completions; i++ { + if !exclude.Has(i) { + s.Insert(i) + } + } + return s +}