Add Job.spec.completionMode and Job.status.completedIndexes

And IndexedJob feature gate, disabled by default.
Update JobDescriber
This commit is contained in:
Aldo Culquicondor
2020-12-30 11:42:01 -05:00
parent 6404eda8de
commit a1a5868a5a
36 changed files with 642 additions and 157 deletions

View File

@@ -18,7 +18,6 @@ package fuzzer
import (
fuzz "github.com/google/gofuzz"
runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/apis/batch"
)
@@ -53,6 +52,11 @@ var Funcs = func(codecs runtimeserializer.CodecFactory) []interface{} {
} else {
j.ManualSelector = nil
}
if c.Rand.Int31()%2 == 0 {
j.CompletionMode = batch.NonIndexedCompletion
} else {
j.CompletionMode = batch.IndexedCompletion
}
},
func(sj *batch.CronJobSpec, c fuzz.Continue) {
c.FuzzNoCustom(sj)

View File

@@ -85,6 +85,22 @@ type JobTemplateSpec struct {
Spec JobSpec
}
// CompletionMode specifies how Pod completions of a Job are tracked.
type CompletionMode string
const (
// NonIndexedCompletion is a Job completion mode. In this mode, the Job is
// considered complete when there have been .spec.completions
// successfully completed Pods. Pod completions are homologous to each other.
NonIndexedCompletion CompletionMode = "NonIndexed"
// IndexedCompletion is a Job completion mode. In this mode, the Pods of a
// Job get an associated completion index from 0 to (.spec.completions - 1).
// The Job is considered complete when a Pod completes for each completion
// index.
IndexedCompletion CompletionMode = "Indexed"
)
// JobSpec describes how the job execution will look like.
type JobSpec struct {
@@ -149,6 +165,28 @@ type JobSpec struct {
// TTLAfterFinished feature.
// +optional
TTLSecondsAfterFinished *int32
// CompletionMode specifies how Pod completions are tracked. It can be
// `NonIndexed` (default) or `Indexed`.
//
// `NonIndexed` means that the Job is considered complete when there have
// been .spec.completions successfully completed Pods. Each Pod completion is
// homologous to each other.
//
// `Indexed` means that the Pods of a
// Job get an associated completion index from 0 to (.spec.completions - 1),
// available in the annotation batch.alpha.kubernetes.io/job-completion-index.
// The Job is considered complete when there is one successfully completed Pod
// for each index.
// When value is `Indexed`, .spec.completions must be specified and
// `.spec.parallelism` must be less than or equal to 10^5.
//
// This field is alpha-level and is only honored by servers that enable the
// IndexedJob feature gate. More completion modes can be added in the future.
// If the Job controller observes a mode that it doesn't recognize, the
// controller skips updates for the Job.
// +optional
CompletionMode CompletionMode
}
// JobStatus represents the current state of a Job.
@@ -183,6 +221,16 @@ type JobStatus struct {
// The number of pods which reached phase Failed.
// +optional
Failed int32
// CompletedIndexes holds the completed indexes when .spec.completionMode =
// "Indexed" in a text format. The indexes are represented as decimal integers
// separated by commas. The numbers are listed in increasing order. Three or
// more consecutive numbers are compressed and represented by the first and
// last element of the series, separated by a hyphen.
// For example, if the completed indexes are 1, 3, 4, 5 and 7, they are
// represented as "1,3-5,7".
// +optional
CompletedIndexes string
}
// JobConditionType is a valid value for JobCondition.Type

View File

@@ -46,4 +46,7 @@ func SetDefaults_Job(obj *batchv1.Job) {
if labels != nil && len(obj.Labels) == 0 {
obj.Labels = labels
}
if len(obj.Spec.CompletionMode) == 0 {
obj.Spec.CompletionMode = batchv1.NonIndexedCompletion
}
}

View File

@@ -21,15 +21,15 @@ import (
"testing"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/api/legacyscheme"
_ "k8s.io/kubernetes/pkg/apis/batch/install"
. "k8s.io/kubernetes/pkg/apis/batch/v1"
_ "k8s.io/kubernetes/pkg/apis/core/install"
utilpointer "k8s.io/utils/pointer"
. "k8s.io/kubernetes/pkg/apis/batch/v1"
)
func TestSetDefaultJob(t *testing.T) {
@@ -49,9 +49,10 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(1),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(6),
Completions: utilpointer.Int32Ptr(1),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(6),
CompletionMode: batchv1.NonIndexedCompletion,
},
},
expectLabels: true,
@@ -69,9 +70,10 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(1),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(6),
Completions: utilpointer.Int32Ptr(1),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(6),
CompletionMode: batchv1.NonIndexedCompletion,
},
},
},
@@ -86,8 +88,9 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: utilpointer.Int32Ptr(0),
BackoffLimit: utilpointer.Int32Ptr(6),
Parallelism: utilpointer.Int32Ptr(0),
BackoffLimit: utilpointer.Int32Ptr(6),
CompletionMode: batchv1.NonIndexedCompletion,
},
},
expectLabels: true,
@@ -103,8 +106,9 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: utilpointer.Int32Ptr(2),
BackoffLimit: utilpointer.Int32Ptr(6),
Parallelism: utilpointer.Int32Ptr(2),
BackoffLimit: utilpointer.Int32Ptr(6),
CompletionMode: batchv1.NonIndexedCompletion,
},
},
expectLabels: true,
@@ -120,9 +124,10 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(2),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(6),
Completions: utilpointer.Int32Ptr(2),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(6),
CompletionMode: batchv1.NonIndexedCompletion,
},
},
expectLabels: true,
@@ -138,9 +143,10 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(1),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(5),
Completions: utilpointer.Int32Ptr(1),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(5),
CompletionMode: batchv1.NonIndexedCompletion,
},
},
expectLabels: true,
@@ -148,9 +154,10 @@ func TestSetDefaultJob(t *testing.T) {
"All set -> no change": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(8),
Parallelism: utilpointer.Int32Ptr(9),
BackoffLimit: utilpointer.Int32Ptr(10),
Completions: utilpointer.Int32Ptr(8),
Parallelism: utilpointer.Int32Ptr(9),
BackoffLimit: utilpointer.Int32Ptr(10),
CompletionMode: batchv1.NonIndexedCompletion,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
@@ -158,9 +165,10 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(8),
Parallelism: utilpointer.Int32Ptr(9),
BackoffLimit: utilpointer.Int32Ptr(10),
Completions: utilpointer.Int32Ptr(8),
Parallelism: utilpointer.Int32Ptr(9),
BackoffLimit: utilpointer.Int32Ptr(10),
CompletionMode: batchv1.NonIndexedCompletion,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
@@ -171,9 +179,10 @@ func TestSetDefaultJob(t *testing.T) {
"All set, flipped -> no change": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(11),
Parallelism: utilpointer.Int32Ptr(10),
BackoffLimit: utilpointer.Int32Ptr(9),
Completions: utilpointer.Int32Ptr(11),
Parallelism: utilpointer.Int32Ptr(10),
BackoffLimit: utilpointer.Int32Ptr(9),
CompletionMode: batchv1.IndexedCompletion,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
@@ -181,9 +190,10 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(11),
Parallelism: utilpointer.Int32Ptr(10),
BackoffLimit: utilpointer.Int32Ptr(9),
Completions: utilpointer.Int32Ptr(11),
Parallelism: utilpointer.Int32Ptr(10),
BackoffLimit: utilpointer.Int32Ptr(9),
CompletionMode: batchv1.IndexedCompletion,
},
},
expectLabels: true,
@@ -191,37 +201,41 @@ func TestSetDefaultJob(t *testing.T) {
}
for name, test := range tests {
original := test.original
expected := test.expected
obj2 := roundTrip(t, runtime.Object(original))
actual, ok := obj2.(*batchv1.Job)
if !ok {
t.Errorf("%s: unexpected object: %v", name, actual)
t.FailNow()
}
t.Run(name, func(t *testing.T) {
validateDefaultInt32(t, name, "Completions", actual.Spec.Completions, expected.Spec.Completions)
validateDefaultInt32(t, name, "Parallelism", actual.Spec.Parallelism, expected.Spec.Parallelism)
validateDefaultInt32(t, name, "BackoffLimit", actual.Spec.BackoffLimit, expected.Spec.BackoffLimit)
if test.expectLabels != reflect.DeepEqual(actual.Labels, actual.Spec.Template.Labels) {
if test.expectLabels {
t.Errorf("%s: expected: %v, got: %v", name, actual.Spec.Template.Labels, actual.Labels)
} else {
t.Errorf("%s: unexpected equality: %v", name, actual.Labels)
original := test.original
expected := test.expected
obj2 := roundTrip(t, runtime.Object(original))
actual, ok := obj2.(*batchv1.Job)
if !ok {
t.Fatalf("Unexpected object: %v", actual)
}
}
validateDefaultInt32(t, "Completions", actual.Spec.Completions, expected.Spec.Completions)
validateDefaultInt32(t, "Parallelism", actual.Spec.Parallelism, expected.Spec.Parallelism)
validateDefaultInt32(t, "BackoffLimit", actual.Spec.BackoffLimit, expected.Spec.BackoffLimit)
if test.expectLabels != reflect.DeepEqual(actual.Labels, actual.Spec.Template.Labels) {
if test.expectLabels {
t.Errorf("Expected labels: %v, got: %v", actual.Spec.Template.Labels, actual.Labels)
} else {
t.Errorf("Unexpected equality: %v", actual.Labels)
}
}
if actual.Spec.CompletionMode != expected.Spec.CompletionMode {
t.Errorf("Got CompletionMode: %v, want: %v", actual.Spec.CompletionMode, expected.Spec.CompletionMode)
}
})
}
}
func validateDefaultInt32(t *testing.T, name string, field string, actual *int32, expected *int32) {
func validateDefaultInt32(t *testing.T, field string, actual *int32, expected *int32) {
if (actual == nil) != (expected == nil) {
t.Errorf("%s: got different *%s than expected: %v %v", name, field, actual, expected)
t.Errorf("Got different *%s than expected: %v %v", field, actual, expected)
}
if actual != nil && expected != nil {
if *actual != *expected {
t.Errorf("%s: got different %s than expected: %d %d", name, field, *actual, *expected)
t.Errorf("Got different %s than expected: %d %d", field, *actual, *expected)
}
}
}

View File

@@ -208,6 +208,7 @@ func autoConvert_v1_JobSpec_To_batch_JobSpec(in *v1.JobSpec, out *batch.JobSpec,
return err
}
out.TTLSecondsAfterFinished = (*int32)(unsafe.Pointer(in.TTLSecondsAfterFinished))
out.CompletionMode = batch.CompletionMode(in.CompletionMode)
return nil
}
@@ -222,6 +223,7 @@ func autoConvert_batch_JobSpec_To_v1_JobSpec(in *batch.JobSpec, out *v1.JobSpec,
return err
}
out.TTLSecondsAfterFinished = (*int32)(unsafe.Pointer(in.TTLSecondsAfterFinished))
out.CompletionMode = v1.CompletionMode(in.CompletionMode)
return nil
}
@@ -232,6 +234,7 @@ func autoConvert_v1_JobStatus_To_batch_JobStatus(in *v1.JobStatus, out *batch.Jo
out.Active = in.Active
out.Succeeded = in.Succeeded
out.Failed = in.Failed
out.CompletedIndexes = in.CompletedIndexes
return nil
}
@@ -247,6 +250,7 @@ func autoConvert_batch_JobStatus_To_v1_JobStatus(in *batch.JobStatus, out *v1.Jo
out.Active = in.Active
out.Succeeded = in.Succeeded
out.Failed = in.Failed
out.CompletedIndexes = in.CompletedIndexes
return nil
}

View File

@@ -31,6 +31,11 @@ import (
apivalidation "k8s.io/kubernetes/pkg/apis/core/validation"
)
// maxParallelismForIndexJob is the maximum parallelism that an Indexed Job
// is allowed to have. This threshold allows to cap the length of
// .status.completedIndexes.
const maxParallelismForIndexedJob = 100000
// ValidateGeneratedSelector validates that the generated selector on a controller object match the controller object
// metadata, and the labels on the pod template are as generated.
//
@@ -124,6 +129,20 @@ func validateJobSpec(spec *batch.JobSpec, fldPath *field.Path, opts apivalidatio
if spec.TTLSecondsAfterFinished != nil {
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*spec.TTLSecondsAfterFinished), fldPath.Child("ttlSecondsAfterFinished"))...)
}
// CompletionMode might be empty when IndexedJob feature gate is disabled.
if spec.CompletionMode != "" {
if spec.CompletionMode != batch.NonIndexedCompletion && spec.CompletionMode != batch.IndexedCompletion {
allErrs = append(allErrs, field.NotSupported(fldPath.Child("completionMode"), spec.CompletionMode, []string{string(batch.NonIndexedCompletion), string(batch.IndexedCompletion)}))
}
if spec.CompletionMode == batch.IndexedCompletion {
if spec.Completions == nil {
allErrs = append(allErrs, field.Required(fldPath.Child("completions"), fmt.Sprintf("when completion mode is %s", batch.IndexedCompletion)))
}
if spec.Parallelism != nil && *spec.Parallelism > maxParallelismForIndexedJob {
allErrs = append(allErrs, field.Invalid(fldPath.Child("parallelism"), *spec.Parallelism, fmt.Sprintf("must be less than or equal to %d when completion mode is %s", maxParallelismForIndexedJob, batch.IndexedCompletion)))
}
}
}
allErrs = append(allErrs, apivalidation.ValidatePodTemplateSpec(&spec.Template, fldPath.Child("template"), opts)...)
@@ -170,6 +189,7 @@ func ValidateJobSpecUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opt
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath.Child("completions"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Selector, oldSpec.Selector, fldPath.Child("selector"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Template, oldSpec.Template, fldPath.Child("template"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.CompletionMode, oldSpec.CompletionMode, fldPath.Child("completionMode"))...)
return allErrs
}

View File

@@ -78,7 +78,7 @@ func TestValidateJob(t *testing.T) {
validPodTemplateSpecForGenerated := getValidPodTemplateSpecForGenerated(validGeneratedSelector)
successCases := map[string]batch.Job{
"manual selector": {
"valid manual selector": {
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
@@ -90,7 +90,7 @@ func TestValidateJob(t *testing.T) {
Template: validPodTemplateSpecForManual,
},
},
"generated selector": {
"valid generated selector": {
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
@@ -101,6 +101,32 @@ func TestValidateJob(t *testing.T) {
Template: validPodTemplateSpecForGenerated,
},
},
"valid NonIndexed completion mode": {
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
UID: types.UID("1a2b3c"),
},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
CompletionMode: batch.NonIndexedCompletion,
},
},
"valid Indexed completion mode": {
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
UID: types.UID("1a2b3c"),
},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
CompletionMode: batch.IndexedCompletion,
Completions: pointer.Int32Ptr(2),
Parallelism: pointer.Int32Ptr(100000),
},
},
}
for k, v := range successCases {
t.Run(k, func(t *testing.T) {
@@ -158,7 +184,7 @@ func TestValidateJob(t *testing.T) {
Template: validPodTemplateSpecForGenerated,
},
},
"spec.template.metadata.labels: Invalid value: {\"y\":\"z\"}: `selector` does not match template `labels`": {
"spec.template.metadata.labels: Invalid value: map[string]string{\"y\":\"z\"}: `selector` does not match template `labels`": {
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
@@ -179,7 +205,7 @@ func TestValidateJob(t *testing.T) {
},
},
},
"spec.template.metadata.labels: Invalid value: {\"controller-uid\":\"4d5e6f\"}: `selector` does not match template `labels`": {
"spec.template.metadata.labels: Invalid value: map[string]string{\"controller-uid\":\"4d5e6f\"}: `selector` does not match template `labels`": {
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
@@ -242,7 +268,7 @@ func TestValidateJob(t *testing.T) {
},
},
},
"spec.ttlSecondsAfterFinished:must be greater than or equal to 0": {
"spec.ttlSecondsAfterFinished: must be greater than or equal to 0": {
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
@@ -254,6 +280,32 @@ func TestValidateJob(t *testing.T) {
Template: validPodTemplateSpecForGenerated,
},
},
"spec.completions: Required value: when completion mode is Indexed": {
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
UID: types.UID("1a2b3c"),
},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
CompletionMode: batch.IndexedCompletion,
},
},
"spec.parallelism: must be less than or equal to 100000 when completion mode is Indexed": {
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
UID: types.UID("1a2b3c"),
},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
CompletionMode: batch.IndexedCompletion,
Completions: pointer.Int32Ptr(2),
Parallelism: pointer.Int32Ptr(100001),
},
},
}
for k, v := range errorCases {
@@ -262,7 +314,7 @@ func TestValidateJob(t *testing.T) {
if len(errs) == 0 {
t.Errorf("expected failure for %s", k)
} else {
s := strings.Split(k, ":")
s := strings.SplitN(k, ":", 2)
err := errs[0]
if err.Field != s[0] || !strings.Contains(err.Error(), s[1]) {
t.Errorf("unexpected error: %v, expected: %s", err, k)
@@ -346,6 +398,24 @@ func TestValidateJobUpdate(t *testing.T) {
Field: "spec.template",
},
},
"immutable completion mode": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
CompletionMode: batch.IndexedCompletion,
Completions: pointer.Int32Ptr(2),
},
},
update: func(job *batch.Job) {
job.Spec.CompletionMode = batch.NonIndexedCompletion
},
err: &field.Error{
Type: field.ErrorTypeInvalid,
Field: "spec.completionMode",
},
},
}
ignoreValueAndDetail := cmpopts.IgnoreFields(field.Error{}, "BadValue", "Detail")
for k, tc := range cases {