Add warnings for big number of completions and parallelism

Change-Id: I63e192b1ce9da7d8bb04f8be1a6e19ec6fbbfa5a
This commit is contained in:
Aldo Culquicondor 2023-06-02 14:59:36 -04:00
parent b53411ffb8
commit c27f9fdeb7
No known key found for this signature in database
GPG Key ID: C005626B875BD5E6
6 changed files with 247 additions and 16 deletions

52
pkg/api/job/warnings.go Normal file
View File

@ -0,0 +1,52 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/utils/pointer"
)
const (
completionsSoftLimit = 100_000
parallelismSoftLimitForUnlimitedCompletions = 10_000
)
// WarningsForJobSpec produces warnings for fields in the JobSpec.
func WarningsForJobSpec(ctx context.Context, path *field.Path, spec, oldSpec *batch.JobSpec) []string {
var warnings []string
if spec.CompletionMode != nil && *spec.CompletionMode == batch.IndexedCompletion {
completions := pointer.Int32Deref(spec.Completions, 0)
parallelism := pointer.Int32Deref(spec.Parallelism, 0)
if completions > completionsSoftLimit && parallelism > parallelismSoftLimitForUnlimitedCompletions {
msg := "In Indexed Jobs with a number of completions higher than 10^5 and a parallelism higher than 10^4, Kubernetes might not be able to track completedIndexes when a big number of indexes fail"
warnings = append(warnings, fmt.Sprintf("%s: %s", path, msg))
}
}
var oldPodTemplate *core.PodTemplateSpec
if oldSpec != nil {
oldPodTemplate = &oldSpec.Template
}
warnings = append(warnings, pod.GetWarningsForPodTemplate(ctx, path.Child("template"), &spec.Template, oldPodTemplate)...)
return warnings
}

View File

@ -0,0 +1,121 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/pointer"
)
var (
validSelector = &metav1.LabelSelector{
MatchLabels: map[string]string{"a": "b"},
}
validPodTemplate = core.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: validSelector.MatchLabels,
},
Spec: core.PodSpec{
RestartPolicy: core.RestartPolicyOnFailure,
DNSPolicy: core.DNSClusterFirst,
Containers: []core.Container{{Name: "abc", Image: "image", ImagePullPolicy: "IfNotPresent", TerminationMessagePolicy: core.TerminationMessageReadFile}},
},
}
)
func TestWarningsForJobSpec(t *testing.T) {
cases := map[string]struct {
spec *batch.JobSpec
wantWarningsCount int
}{
"valid NonIndexed": {
spec: &batch.JobSpec{
CompletionMode: completionModePtr(batch.NonIndexedCompletion),
Template: validPodTemplate,
},
},
"NondIndexed with high completions and parallelism": {
spec: &batch.JobSpec{
CompletionMode: completionModePtr(batch.NonIndexedCompletion),
Template: validPodTemplate,
Parallelism: pointer.Int32(1_000_000_000),
Completions: pointer.Int32(1_000_000_000),
},
},
"invalid PodTemplate": {
spec: &batch.JobSpec{
CompletionMode: completionModePtr(batch.NonIndexedCompletion),
Template: core.PodTemplateSpec{
Spec: core.PodSpec{ImagePullSecrets: []core.LocalObjectReference{{Name: ""}}},
},
},
wantWarningsCount: 1,
},
"valid Indexed low completions low parallelism": {
spec: &batch.JobSpec{
CompletionMode: completionModePtr(batch.IndexedCompletion),
Completions: pointer.Int32(10_000),
Parallelism: pointer.Int32(10_000),
Template: validPodTemplate,
},
},
"valid Indexed high completions low parallelism": {
spec: &batch.JobSpec{
CompletionMode: completionModePtr(batch.IndexedCompletion),
Completions: pointer.Int32(1000_000_000),
Parallelism: pointer.Int32(10_000),
Template: validPodTemplate,
},
},
"valid Indexed medium completions medium parallelism": {
spec: &batch.JobSpec{
CompletionMode: completionModePtr(batch.IndexedCompletion),
Completions: pointer.Int32(100_000),
Parallelism: pointer.Int32(100_000),
Template: validPodTemplate,
},
},
"invalid Indexed high completions high parallelism": {
spec: &batch.JobSpec{
CompletionMode: completionModePtr(batch.IndexedCompletion),
Completions: pointer.Int32(100_001),
Parallelism: pointer.Int32(10_001),
Template: validPodTemplate,
},
wantWarningsCount: 1,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
warnings := WarningsForJobSpec(ctx, nil, tc.spec, nil)
if len(warnings) != tc.wantWarningsCount {
t.Errorf("Got %d warnings, want %d.\nWarnings: %v", len(warnings), tc.wantWarningsCount, warnings)
}
})
}
}
func completionModePtr(m batch.CompletionMode) *batch.CompletionMode {
return &m
}

View File

@ -30,6 +30,7 @@ import (
genericapirequest "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage/names" "k8s.io/apiserver/pkg/storage/names"
"k8s.io/kubernetes/pkg/api/job"
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/api/pod" "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/batch"
@ -121,7 +122,7 @@ func (cronJobStrategy) WarningsOnCreate(ctx context.Context, obj runtime.Object)
if msgs := utilvalidation.IsDNS1123Label(newCronJob.Name); len(msgs) != 0 { if msgs := utilvalidation.IsDNS1123Label(newCronJob.Name); len(msgs) != 0 {
warnings = append(warnings, fmt.Sprintf("metadata.name: this is used in Pod names and hostnames, which can result in surprising behavior; a DNS label is recommended: %v", msgs)) warnings = append(warnings, fmt.Sprintf("metadata.name: this is used in Pod names and hostnames, which can result in surprising behavior; a DNS label is recommended: %v", msgs))
} }
warnings = append(warnings, pod.GetWarningsForPodTemplate(ctx, field.NewPath("spec", "jobTemplate", "spec", "template"), &newCronJob.Spec.JobTemplate.Spec.Template, nil)...) warnings = append(warnings, job.WarningsForJobSpec(ctx, field.NewPath("spec", "jobTemplate", "spec"), &newCronJob.Spec.JobTemplate.Spec, nil)...)
if strings.Contains(newCronJob.Spec.Schedule, "TZ") { if strings.Contains(newCronJob.Spec.Schedule, "TZ") {
warnings = append(warnings, fmt.Sprintf("CRON_TZ or TZ used in %s is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", field.NewPath("spec", "spec", "schedule"))) warnings = append(warnings, fmt.Sprintf("CRON_TZ or TZ used in %s is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", field.NewPath("spec", "spec", "schedule")))
} }
@ -156,7 +157,7 @@ func (cronJobStrategy) WarningsOnUpdate(ctx context.Context, obj, old runtime.Ob
newCronJob := obj.(*batch.CronJob) newCronJob := obj.(*batch.CronJob)
oldCronJob := old.(*batch.CronJob) oldCronJob := old.(*batch.CronJob)
if newCronJob.Generation != oldCronJob.Generation { if newCronJob.Generation != oldCronJob.Generation {
warnings = pod.GetWarningsForPodTemplate(ctx, field.NewPath("spec", "jobTemplate", "spec", "template"), &newCronJob.Spec.JobTemplate.Spec.Template, &oldCronJob.Spec.JobTemplate.Spec.Template) warnings = job.WarningsForJobSpec(ctx, field.NewPath("spec", "jobTemplate", "spec"), &newCronJob.Spec.JobTemplate.Spec, &oldCronJob.Spec.JobTemplate.Spec)
} }
if strings.Contains(newCronJob.Spec.Schedule, "TZ") { if strings.Contains(newCronJob.Spec.Schedule, "TZ") {
warnings = append(warnings, fmt.Sprintf("CRON_TZ or TZ used in %s is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", field.NewPath("spec", "spec", "schedule"))) warnings = append(warnings, fmt.Sprintf("CRON_TZ or TZ used in %s is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", field.NewPath("spec", "spec", "schedule")))

View File

@ -41,7 +41,10 @@ var (
TimeZone: pointer.String("Asia/Shanghai"), TimeZone: pointer.String("Asia/Shanghai"),
JobTemplate: batch.JobTemplateSpec{ JobTemplate: batch.JobTemplateSpec{
Spec: batch.JobSpec{ Spec: batch.JobSpec{
Template: validPodTemplateSpec, Template: validPodTemplateSpec,
CompletionMode: completionModePtr(batch.IndexedCompletion),
Completions: pointer.Int32(10),
Parallelism: pointer.Int32(10),
}, },
}, },
} }
@ -57,6 +60,10 @@ var (
} }
) )
func completionModePtr(m batch.CompletionMode) *batch.CompletionMode {
return &m
}
func TestCronJobStrategy(t *testing.T) { func TestCronJobStrategy(t *testing.T) {
ctx := genericapirequest.NewDefaultContext() ctx := genericapirequest.NewDefaultContext()
if !Strategy.NamespaceScoped() { if !Strategy.NamespaceScoped() {

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/names" "k8s.io/apiserver/pkg/storage/names"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/api/job"
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/api/pod" "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/batch"
@ -171,7 +172,7 @@ func (jobStrategy) WarningsOnCreate(ctx context.Context, obj runtime.Object) []s
if msgs := utilvalidation.IsDNS1123Label(newJob.Name); len(msgs) != 0 { if msgs := utilvalidation.IsDNS1123Label(newJob.Name); len(msgs) != 0 {
warnings = append(warnings, fmt.Sprintf("metadata.name: this is used in Pod names and hostnames, which can result in surprising behavior; a DNS label is recommended: %v", msgs)) warnings = append(warnings, fmt.Sprintf("metadata.name: this is used in Pod names and hostnames, which can result in surprising behavior; a DNS label is recommended: %v", msgs))
} }
warnings = append(warnings, pod.GetWarningsForPodTemplate(ctx, field.NewPath("spec", "template"), &newJob.Spec.Template, nil)...) warnings = append(warnings, job.WarningsForJobSpec(ctx, field.NewPath("spec"), &newJob.Spec, nil)...)
return warnings return warnings
} }
@ -261,7 +262,7 @@ func (jobStrategy) WarningsOnUpdate(ctx context.Context, obj, old runtime.Object
newJob := obj.(*batch.Job) newJob := obj.(*batch.Job)
oldJob := old.(*batch.Job) oldJob := old.(*batch.Job)
if newJob.Generation != oldJob.Generation { if newJob.Generation != oldJob.Generation {
warnings = pod.GetWarningsForPodTemplate(ctx, field.NewPath("spec", "template"), &newJob.Spec.Template, &oldJob.Spec.Template) warnings = job.WarningsForJobSpec(ctx, field.NewPath("spec"), &newJob.Spec, &oldJob.Spec)
} }
return warnings return warnings
} }

View File

@ -836,6 +836,37 @@ func TestJobStrategy_WarningsOnUpdate(t *testing.T) {
}, },
wantWarningsCount: 1, wantWarningsCount: 1,
}, },
"Invalid transition to high parallelism": {
wantWarningsCount: 1,
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob2",
Namespace: metav1.NamespaceDefault,
Generation: 1,
ResourceVersion: "0",
},
Spec: batch.JobSpec{
CompletionMode: completionModePtr(batch.IndexedCompletion),
Completions: pointer.Int32(100_001),
Parallelism: pointer.Int32(10_001),
Template: validPodTemplateSpec,
},
},
oldJob: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob2",
Namespace: metav1.NamespaceDefault,
Generation: 0,
ResourceVersion: "0",
},
Spec: batch.JobSpec{
CompletionMode: completionModePtr(batch.IndexedCompletion),
Completions: pointer.Int32(100_001),
Parallelism: pointer.Int32(10_000),
Template: validPodTemplateSpec,
},
},
},
} }
for val, tc := range cases { for val, tc := range cases {
t.Run(val, func(t *testing.T) { t.Run(val, func(t *testing.T) {
@ -853,18 +884,20 @@ func TestJobStrategy_WarningsOnCreate(t *testing.T) {
validSelector := &metav1.LabelSelector{ validSelector := &metav1.LabelSelector{
MatchLabels: map[string]string{"a": "b"}, MatchLabels: map[string]string{"a": "b"},
} }
validSpec := batch.JobSpec{ validPodTemplate := api.PodTemplateSpec{
Selector: nil, ObjectMeta: metav1.ObjectMeta{
Template: api.PodTemplateSpec{ Labels: validSelector.MatchLabels,
ObjectMeta: metav1.ObjectMeta{
Labels: validSelector.MatchLabels,
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyOnFailure,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{Name: "abc", Image: "image", ImagePullPolicy: "IfNotPresent", TerminationMessagePolicy: api.TerminationMessageReadFile}},
},
}, },
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyOnFailure,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{Name: "abc", Image: "image", ImagePullPolicy: "IfNotPresent", TerminationMessagePolicy: api.TerminationMessageReadFile}},
},
}
validSpec := batch.JobSpec{
CompletionMode: completionModePtr(batch.NonIndexedCompletion),
Selector: nil,
Template: validPodTemplate,
} }
testcases := map[string]struct { testcases := map[string]struct {
@ -892,6 +925,22 @@ func TestJobStrategy_WarningsOnCreate(t *testing.T) {
Spec: validSpec, Spec: validSpec,
}, },
}, },
"high completions and parallelism": {
wantWarningsCount: 1,
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob2",
Namespace: metav1.NamespaceDefault,
UID: theUID,
},
Spec: batch.JobSpec{
CompletionMode: completionModePtr(batch.IndexedCompletion),
Parallelism: pointer.Int32(100_001),
Completions: pointer.Int32(100_001),
Template: validPodTemplate,
},
},
},
} }
for name, tc := range testcases { for name, tc := range testcases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {