From c27f9fdeb79d2620678af669df1164ff5e804317 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Fri, 2 Jun 2023 14:59:36 -0400 Subject: [PATCH] Add warnings for big number of completions and parallelism Change-Id: I63e192b1ce9da7d8bb04f8be1a6e19ec6fbbfa5a --- pkg/api/job/warnings.go | 52 +++++++++ pkg/api/job/warnings_test.go | 121 ++++++++++++++++++++ pkg/registry/batch/cronjob/strategy.go | 5 +- pkg/registry/batch/cronjob/strategy_test.go | 9 +- pkg/registry/batch/job/strategy.go | 5 +- pkg/registry/batch/job/strategy_test.go | 71 ++++++++++-- 6 files changed, 247 insertions(+), 16 deletions(-) create mode 100644 pkg/api/job/warnings.go create mode 100644 pkg/api/job/warnings_test.go diff --git a/pkg/api/job/warnings.go b/pkg/api/job/warnings.go new file mode 100644 index 00000000000..e04a4939be5 --- /dev/null +++ b/pkg/api/job/warnings.go @@ -0,0 +1,52 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/kubernetes/pkg/api/pod" + "k8s.io/kubernetes/pkg/apis/batch" + "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/utils/pointer" +) + +const ( + completionsSoftLimit = 100_000 + parallelismSoftLimitForUnlimitedCompletions = 10_000 +) + +// WarningsForJobSpec produces warnings for fields in the JobSpec. +func WarningsForJobSpec(ctx context.Context, path *field.Path, spec, oldSpec *batch.JobSpec) []string { + var warnings []string + if spec.CompletionMode != nil && *spec.CompletionMode == batch.IndexedCompletion { + completions := pointer.Int32Deref(spec.Completions, 0) + parallelism := pointer.Int32Deref(spec.Parallelism, 0) + if completions > completionsSoftLimit && parallelism > parallelismSoftLimitForUnlimitedCompletions { + msg := "In Indexed Jobs with a number of completions higher than 10^5 and a parallelism higher than 10^4, Kubernetes might not be able to track completedIndexes when a big number of indexes fail" + warnings = append(warnings, fmt.Sprintf("%s: %s", path, msg)) + } + } + var oldPodTemplate *core.PodTemplateSpec + if oldSpec != nil { + oldPodTemplate = &oldSpec.Template + } + warnings = append(warnings, pod.GetWarningsForPodTemplate(ctx, path.Child("template"), &spec.Template, oldPodTemplate)...) + return warnings +} diff --git a/pkg/api/job/warnings_test.go b/pkg/api/job/warnings_test.go new file mode 100644 index 00000000000..0783abc12a7 --- /dev/null +++ b/pkg/api/job/warnings_test.go @@ -0,0 +1,121 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/apis/batch" + "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/test/utils/ktesting" + "k8s.io/utils/pointer" +) + +var ( + validSelector = &metav1.LabelSelector{ + MatchLabels: map[string]string{"a": "b"}, + } + + validPodTemplate = core.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: validSelector.MatchLabels, + }, + Spec: core.PodSpec{ + RestartPolicy: core.RestartPolicyOnFailure, + DNSPolicy: core.DNSClusterFirst, + Containers: []core.Container{{Name: "abc", Image: "image", ImagePullPolicy: "IfNotPresent", TerminationMessagePolicy: core.TerminationMessageReadFile}}, + }, + } +) + +func TestWarningsForJobSpec(t *testing.T) { + cases := map[string]struct { + spec *batch.JobSpec + wantWarningsCount int + }{ + "valid NonIndexed": { + spec: &batch.JobSpec{ + CompletionMode: completionModePtr(batch.NonIndexedCompletion), + Template: validPodTemplate, + }, + }, + "NondIndexed with high completions and parallelism": { + spec: &batch.JobSpec{ + CompletionMode: completionModePtr(batch.NonIndexedCompletion), + Template: validPodTemplate, + Parallelism: pointer.Int32(1_000_000_000), + Completions: pointer.Int32(1_000_000_000), + }, + }, + "invalid PodTemplate": { + spec: &batch.JobSpec{ + CompletionMode: completionModePtr(batch.NonIndexedCompletion), + Template: core.PodTemplateSpec{ + Spec: core.PodSpec{ImagePullSecrets: []core.LocalObjectReference{{Name: ""}}}, + }, + }, + wantWarningsCount: 1, + }, + "valid Indexed low completions low parallelism": { + spec: &batch.JobSpec{ + CompletionMode: completionModePtr(batch.IndexedCompletion), + Completions: pointer.Int32(10_000), + Parallelism: pointer.Int32(10_000), + Template: validPodTemplate, + }, + }, + "valid Indexed high completions low parallelism": { + spec: &batch.JobSpec{ + CompletionMode: completionModePtr(batch.IndexedCompletion), + Completions: pointer.Int32(1000_000_000), + Parallelism: pointer.Int32(10_000), + Template: validPodTemplate, + }, + }, + "valid Indexed medium completions medium parallelism": { + spec: &batch.JobSpec{ + CompletionMode: completionModePtr(batch.IndexedCompletion), + Completions: pointer.Int32(100_000), + Parallelism: pointer.Int32(100_000), + Template: validPodTemplate, + }, + }, + "invalid Indexed high completions high parallelism": { + spec: &batch.JobSpec{ + CompletionMode: completionModePtr(batch.IndexedCompletion), + Completions: pointer.Int32(100_001), + Parallelism: pointer.Int32(10_001), + Template: validPodTemplate, + }, + wantWarningsCount: 1, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + warnings := WarningsForJobSpec(ctx, nil, tc.spec, nil) + if len(warnings) != tc.wantWarningsCount { + t.Errorf("Got %d warnings, want %d.\nWarnings: %v", len(warnings), tc.wantWarningsCount, warnings) + } + }) + } +} + +func completionModePtr(m batch.CompletionMode) *batch.CompletionMode { + return &m +} diff --git a/pkg/registry/batch/cronjob/strategy.go b/pkg/registry/batch/cronjob/strategy.go index c15a164be0c..62233ba2ae6 100644 --- a/pkg/registry/batch/cronjob/strategy.go +++ b/pkg/registry/batch/cronjob/strategy.go @@ -30,6 +30,7 @@ import ( genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/storage/names" + "k8s.io/kubernetes/pkg/api/job" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/pod" "k8s.io/kubernetes/pkg/apis/batch" @@ -121,7 +122,7 @@ func (cronJobStrategy) WarningsOnCreate(ctx context.Context, obj runtime.Object) if msgs := utilvalidation.IsDNS1123Label(newCronJob.Name); len(msgs) != 0 { warnings = append(warnings, fmt.Sprintf("metadata.name: this is used in Pod names and hostnames, which can result in surprising behavior; a DNS label is recommended: %v", msgs)) } - warnings = append(warnings, pod.GetWarningsForPodTemplate(ctx, field.NewPath("spec", "jobTemplate", "spec", "template"), &newCronJob.Spec.JobTemplate.Spec.Template, nil)...) + warnings = append(warnings, job.WarningsForJobSpec(ctx, field.NewPath("spec", "jobTemplate", "spec"), &newCronJob.Spec.JobTemplate.Spec, nil)...) if strings.Contains(newCronJob.Spec.Schedule, "TZ") { warnings = append(warnings, fmt.Sprintf("CRON_TZ or TZ used in %s is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", field.NewPath("spec", "spec", "schedule"))) } @@ -156,7 +157,7 @@ func (cronJobStrategy) WarningsOnUpdate(ctx context.Context, obj, old runtime.Ob newCronJob := obj.(*batch.CronJob) oldCronJob := old.(*batch.CronJob) if newCronJob.Generation != oldCronJob.Generation { - warnings = pod.GetWarningsForPodTemplate(ctx, field.NewPath("spec", "jobTemplate", "spec", "template"), &newCronJob.Spec.JobTemplate.Spec.Template, &oldCronJob.Spec.JobTemplate.Spec.Template) + warnings = job.WarningsForJobSpec(ctx, field.NewPath("spec", "jobTemplate", "spec"), &newCronJob.Spec.JobTemplate.Spec, &oldCronJob.Spec.JobTemplate.Spec) } if strings.Contains(newCronJob.Spec.Schedule, "TZ") { warnings = append(warnings, fmt.Sprintf("CRON_TZ or TZ used in %s is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", field.NewPath("spec", "spec", "schedule"))) diff --git a/pkg/registry/batch/cronjob/strategy_test.go b/pkg/registry/batch/cronjob/strategy_test.go index 1369480f0de..86e8e3b090c 100644 --- a/pkg/registry/batch/cronjob/strategy_test.go +++ b/pkg/registry/batch/cronjob/strategy_test.go @@ -41,7 +41,10 @@ var ( TimeZone: pointer.String("Asia/Shanghai"), JobTemplate: batch.JobTemplateSpec{ Spec: batch.JobSpec{ - Template: validPodTemplateSpec, + Template: validPodTemplateSpec, + CompletionMode: completionModePtr(batch.IndexedCompletion), + Completions: pointer.Int32(10), + Parallelism: pointer.Int32(10), }, }, } @@ -57,6 +60,10 @@ var ( } ) +func completionModePtr(m batch.CompletionMode) *batch.CompletionMode { + return &m +} + func TestCronJobStrategy(t *testing.T) { ctx := genericapirequest.NewDefaultContext() if !Strategy.NamespaceScoped() { diff --git a/pkg/registry/batch/job/strategy.go b/pkg/registry/batch/job/strategy.go index 6e7056bc481..918e21c3622 100644 --- a/pkg/registry/batch/job/strategy.go +++ b/pkg/registry/batch/job/strategy.go @@ -37,6 +37,7 @@ import ( "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/names" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/api/job" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/pod" "k8s.io/kubernetes/pkg/apis/batch" @@ -171,7 +172,7 @@ func (jobStrategy) WarningsOnCreate(ctx context.Context, obj runtime.Object) []s if msgs := utilvalidation.IsDNS1123Label(newJob.Name); len(msgs) != 0 { warnings = append(warnings, fmt.Sprintf("metadata.name: this is used in Pod names and hostnames, which can result in surprising behavior; a DNS label is recommended: %v", msgs)) } - warnings = append(warnings, pod.GetWarningsForPodTemplate(ctx, field.NewPath("spec", "template"), &newJob.Spec.Template, nil)...) + warnings = append(warnings, job.WarningsForJobSpec(ctx, field.NewPath("spec"), &newJob.Spec, nil)...) return warnings } @@ -261,7 +262,7 @@ func (jobStrategy) WarningsOnUpdate(ctx context.Context, obj, old runtime.Object newJob := obj.(*batch.Job) oldJob := old.(*batch.Job) if newJob.Generation != oldJob.Generation { - warnings = pod.GetWarningsForPodTemplate(ctx, field.NewPath("spec", "template"), &newJob.Spec.Template, &oldJob.Spec.Template) + warnings = job.WarningsForJobSpec(ctx, field.NewPath("spec"), &newJob.Spec, &oldJob.Spec) } return warnings } diff --git a/pkg/registry/batch/job/strategy_test.go b/pkg/registry/batch/job/strategy_test.go index 5dbcfe89314..22c7bdee8ec 100644 --- a/pkg/registry/batch/job/strategy_test.go +++ b/pkg/registry/batch/job/strategy_test.go @@ -836,6 +836,37 @@ func TestJobStrategy_WarningsOnUpdate(t *testing.T) { }, wantWarningsCount: 1, }, + "Invalid transition to high parallelism": { + wantWarningsCount: 1, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob2", + Namespace: metav1.NamespaceDefault, + Generation: 1, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + CompletionMode: completionModePtr(batch.IndexedCompletion), + Completions: pointer.Int32(100_001), + Parallelism: pointer.Int32(10_001), + Template: validPodTemplateSpec, + }, + }, + oldJob: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob2", + Namespace: metav1.NamespaceDefault, + Generation: 0, + ResourceVersion: "0", + }, + Spec: batch.JobSpec{ + CompletionMode: completionModePtr(batch.IndexedCompletion), + Completions: pointer.Int32(100_001), + Parallelism: pointer.Int32(10_000), + Template: validPodTemplateSpec, + }, + }, + }, } for val, tc := range cases { t.Run(val, func(t *testing.T) { @@ -853,18 +884,20 @@ func TestJobStrategy_WarningsOnCreate(t *testing.T) { validSelector := &metav1.LabelSelector{ MatchLabels: map[string]string{"a": "b"}, } - validSpec := batch.JobSpec{ - Selector: nil, - Template: api.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: validSelector.MatchLabels, - }, - Spec: api.PodSpec{ - RestartPolicy: api.RestartPolicyOnFailure, - DNSPolicy: api.DNSClusterFirst, - Containers: []api.Container{{Name: "abc", Image: "image", ImagePullPolicy: "IfNotPresent", TerminationMessagePolicy: api.TerminationMessageReadFile}}, - }, + validPodTemplate := api.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: validSelector.MatchLabels, }, + Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicyOnFailure, + DNSPolicy: api.DNSClusterFirst, + Containers: []api.Container{{Name: "abc", Image: "image", ImagePullPolicy: "IfNotPresent", TerminationMessagePolicy: api.TerminationMessageReadFile}}, + }, + } + validSpec := batch.JobSpec{ + CompletionMode: completionModePtr(batch.NonIndexedCompletion), + Selector: nil, + Template: validPodTemplate, } testcases := map[string]struct { @@ -892,6 +925,22 @@ func TestJobStrategy_WarningsOnCreate(t *testing.T) { Spec: validSpec, }, }, + "high completions and parallelism": { + wantWarningsCount: 1, + job: &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjob2", + Namespace: metav1.NamespaceDefault, + UID: theUID, + }, + Spec: batch.JobSpec{ + CompletionMode: completionModePtr(batch.IndexedCompletion), + Parallelism: pointer.Int32(100_001), + Completions: pointer.Int32(100_001), + Template: validPodTemplate, + }, + }, + }, } for name, tc := range testcases { t.Run(name, func(t *testing.T) {