add podReplacementPolicy and terminating field to job api

This commit is contained in:
kannon92
2023-07-18 17:25:15 +00:00
parent dde22b3a5e
commit ce92952037
31 changed files with 715 additions and 159 deletions

View File

@@ -63,6 +63,11 @@ var Funcs = func(codecs runtimeserializer.CodecFactory) []interface{} {
// We're fuzzing the internal JobSpec type, not the v1 type, so we don't
// need to fuzz the nil value.
j.Suspend = pointer.Bool(c.RandBool())
podReplacementPolicy := batch.TerminatingOrFailed
if c.RandBool() {
podReplacementPolicy = batch.Failed
}
j.PodReplacementPolicy = &podReplacementPolicy
},
func(sj *batch.CronJobSpec, c fuzz.Continue) {
c.FuzzNoCustom(sj)

View File

@@ -151,6 +151,19 @@ const (
PodFailurePolicyOnExitCodesOpNotIn PodFailurePolicyOnExitCodesOperator = "NotIn"
)
// PodReplacementPolicy specifies the policy for creating pod replacements.
// +enum
type PodReplacementPolicy string
const (
// TerminatingOrFailed means that we recreate pods
// when they are terminating (has a metadata.deletionTimestamp) or failed.
TerminatingOrFailed PodReplacementPolicy = "TerminatingOrFailed"
//Failed means to wait until a previously created Pod is fully terminated (has phase
//Failed or Succeeded) before creating a replacement Pod.
Failed PodReplacementPolicy = "Failed"
)
// PodFailurePolicyOnExitCodesRequirement describes the requirement for handling
// a failed pod based on its container exit codes. In particular, it lookups the
// .state.terminated.exitCode for each app container and init container status,
@@ -381,6 +394,19 @@ type JobSpec struct {
//
// +optional
Suspend *bool
// podReplacementPolicy specifies when to create replacement Pods.
// Possible values are:
// - TerminatingOrFailed means that we recreate pods
// when they are terminating (has a metadata.deletionTimestamp) or failed.
// - Failed means to wait until a previously created Pod is fully terminated (has phase
// Failed or Succeeded) before creating a replacement Pod.
//
// When using podFailurePolicy, Failed is the the only allowed value.
// TerminatingOrFailed and Failed are allowed values when podFailurePolicy is not in use.
// This is an alpha field. Enable JobPodReplacementPolicy to be able to use this field.
// +optional
PodReplacementPolicy *PodReplacementPolicy
}
// JobStatus represents the current state of a Job.
@@ -413,6 +439,14 @@ type JobStatus struct {
// +optional
Active int32
// The number of pods which are terminating (in phase Pending or Running
// and have a deletionTimestamp).
//
// This field is alpha-level. The job controller populates the field when
// the feature gate JobPodReplacementPolicy is enabled (disabled by default).
// +optional
Terminating *int32
// The number of active pods which have a Ready condition.
//
// This field is beta-level. The job controller populates the field when

View File

@@ -22,6 +22,8 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
utilpointer "k8s.io/utils/pointer"
)
@@ -68,6 +70,15 @@ func SetDefaults_Job(obj *batchv1.Job) {
}
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
if obj.Spec.PodReplacementPolicy == nil {
if obj.Spec.PodFailurePolicy != nil {
obj.Spec.PodReplacementPolicy = podReplacementPolicyPtr(batchv1.Failed)
} else {
obj.Spec.PodReplacementPolicy = podReplacementPolicyPtr(batchv1.TerminatingOrFailed)
}
}
}
}
func SetDefaults_CronJob(obj *batchv1.CronJob) {
@@ -84,3 +95,7 @@ func SetDefaults_CronJob(obj *batchv1.CronJob) {
obj.Spec.FailedJobsHistoryLimit = utilpointer.Int32(1)
}
}
func podReplacementPolicyPtr(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy {
return &obj
}

View File

@@ -26,9 +26,12 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/api/legacyscheme"
_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/pointer"
. "k8s.io/kubernetes/pkg/apis/batch/v1"
@@ -40,9 +43,10 @@ func TestSetDefaultJob(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
}
tests := map[string]struct {
original *batchv1.Job
expected *batchv1.Job
expectLabels bool
original *batchv1.Job
expected *batchv1.Job
expectLabels bool
enablePodReplacementPolicy bool
}{
"Pod failure policy with some field values unspecified -> set default values": {
original: &batchv1.Job{
@@ -135,6 +139,70 @@ func TestSetDefaultJob(t *testing.T) {
},
expectLabels: true,
},
"Pod failure policy and defaulting for pod replacement policy": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
PodFailurePolicy: &batchv1.PodFailurePolicy{
Rules: []batchv1.PodFailurePolicyRule{
{
Action: batchv1.PodFailurePolicyActionFailJob,
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{1},
},
},
},
},
},
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(1),
Parallelism: pointer.Int32(1),
BackoffLimit: pointer.Int32(6),
CompletionMode: completionModePtr(batchv1.NonIndexedCompletion),
Suspend: pointer.Bool(false),
PodReplacementPolicy: podReplacementPtr(batchv1.Failed),
PodFailurePolicy: &batchv1.PodFailurePolicy{
Rules: []batchv1.PodFailurePolicyRule{
{
Action: batchv1.PodFailurePolicyActionFailJob,
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{1},
},
},
},
},
},
},
expectLabels: true,
enablePodReplacementPolicy: true,
},
"All unspecified and podReplacementPolicyEnabled -> sets all to default values": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
},
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(1),
Parallelism: pointer.Int32(1),
BackoffLimit: pointer.Int32(6),
CompletionMode: completionModePtr(batchv1.NonIndexedCompletion),
Suspend: pointer.Bool(false),
PodReplacementPolicy: podReplacementPtr(batchv1.TerminatingOrFailed),
},
},
expectLabels: true,
enablePodReplacementPolicy: true,
},
"All unspecified -> sets all to default values": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
@@ -295,11 +363,12 @@ func TestSetDefaultJob(t *testing.T) {
"All set -> no change": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(8),
Parallelism: pointer.Int32(9),
BackoffLimit: pointer.Int32(10),
CompletionMode: completionModePtr(batchv1.NonIndexedCompletion),
Suspend: pointer.Bool(false),
Completions: pointer.Int32(8),
Parallelism: pointer.Int32(9),
BackoffLimit: pointer.Int32(10),
CompletionMode: completionModePtr(batchv1.NonIndexedCompletion),
Suspend: pointer.Bool(false),
PodReplacementPolicy: podReplacementPtr(batchv1.TerminatingOrFailed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
@@ -307,11 +376,12 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(8),
Parallelism: pointer.Int32(9),
BackoffLimit: pointer.Int32(10),
CompletionMode: completionModePtr(batchv1.NonIndexedCompletion),
Suspend: pointer.Bool(false),
Completions: pointer.Int32(8),
Parallelism: pointer.Int32(9),
BackoffLimit: pointer.Int32(10),
CompletionMode: completionModePtr(batchv1.NonIndexedCompletion),
Suspend: pointer.Bool(false),
PodReplacementPolicy: podReplacementPtr(batchv1.TerminatingOrFailed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
@@ -322,11 +392,12 @@ func TestSetDefaultJob(t *testing.T) {
"All set, flipped -> no change": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(11),
Parallelism: pointer.Int32(10),
BackoffLimit: pointer.Int32(9),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
Suspend: pointer.Bool(true),
Completions: pointer.Int32(11),
Parallelism: pointer.Int32(10),
BackoffLimit: pointer.Int32(9),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
Suspend: pointer.Bool(true),
PodReplacementPolicy: podReplacementPtr(batchv1.Failed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
@@ -334,11 +405,12 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(11),
Parallelism: pointer.Int32(10),
BackoffLimit: pointer.Int32(9),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
Suspend: pointer.Bool(true),
Completions: pointer.Int32(11),
Parallelism: pointer.Int32(10),
BackoffLimit: pointer.Int32(9),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
Suspend: pointer.Bool(true),
PodReplacementPolicy: podReplacementPtr(batchv1.Failed),
},
},
expectLabels: true,
@@ -396,6 +468,7 @@ func TestSetDefaultJob(t *testing.T) {
for name, test := range tests {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobPodReplacementPolicy, test.enablePodReplacementPolicy)()
original := test.original
expected := test.expected
obj2 := roundTrip(t, runtime.Object(original))
@@ -424,6 +497,9 @@ func TestSetDefaultJob(t *testing.T) {
if diff := cmp.Diff(expected.Spec.CompletionMode, actual.Spec.CompletionMode); diff != "" {
t.Errorf("Unexpected CompletionMode (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(expected.Spec.PodReplacementPolicy, actual.Spec.PodReplacementPolicy); diff != "" {
t.Errorf("Unexpected PodReplacementPolicy (-want,+got):\n%s", diff)
}
})
}
}
@@ -522,3 +598,7 @@ func TestSetDefaultCronJob(t *testing.T) {
func completionModePtr(m batchv1.CompletionMode) *batchv1.CompletionMode {
return &m
}
func podReplacementPtr(m batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy {
return &m
}

View File

@@ -451,6 +451,7 @@ func autoConvert_v1_JobSpec_To_batch_JobSpec(in *v1.JobSpec, out *batch.JobSpec,
out.TTLSecondsAfterFinished = (*int32)(unsafe.Pointer(in.TTLSecondsAfterFinished))
out.CompletionMode = (*batch.CompletionMode)(unsafe.Pointer(in.CompletionMode))
out.Suspend = (*bool)(unsafe.Pointer(in.Suspend))
out.PodReplacementPolicy = (*batch.PodReplacementPolicy)(unsafe.Pointer(in.PodReplacementPolicy))
return nil
}
@@ -470,6 +471,7 @@ func autoConvert_batch_JobSpec_To_v1_JobSpec(in *batch.JobSpec, out *v1.JobSpec,
out.TTLSecondsAfterFinished = (*int32)(unsafe.Pointer(in.TTLSecondsAfterFinished))
out.CompletionMode = (*v1.CompletionMode)(unsafe.Pointer(in.CompletionMode))
out.Suspend = (*bool)(unsafe.Pointer(in.Suspend))
out.PodReplacementPolicy = (*v1.PodReplacementPolicy)(unsafe.Pointer(in.PodReplacementPolicy))
return nil
}
@@ -480,6 +482,7 @@ func autoConvert_v1_JobStatus_To_batch_JobStatus(in *v1.JobStatus, out *batch.Jo
out.Active = in.Active
out.Succeeded = in.Succeeded
out.Failed = in.Failed
out.Terminating = (*int32)(unsafe.Pointer(in.Terminating))
out.CompletedIndexes = in.CompletedIndexes
out.FailedIndexes = (*string)(unsafe.Pointer(in.FailedIndexes))
out.UncountedTerminatedPods = (*batch.UncountedTerminatedPods)(unsafe.Pointer(in.UncountedTerminatedPods))
@@ -497,6 +500,7 @@ func autoConvert_batch_JobStatus_To_v1_JobStatus(in *batch.JobStatus, out *v1.Jo
out.StartTime = (*metav1.Time)(unsafe.Pointer(in.StartTime))
out.CompletionTime = (*metav1.Time)(unsafe.Pointer(in.CompletionTime))
out.Active = in.Active
out.Terminating = (*int32)(unsafe.Pointer(in.Terminating))
out.Ready = (*int32)(unsafe.Pointer(in.Ready))
out.Succeeded = in.Succeeded
out.Failed = in.Failed

View File

@@ -77,6 +77,10 @@ var (
string(api.ConditionFalse),
string(api.ConditionTrue),
string(api.ConditionUnknown))
supportedPodRecreationPolicy = sets.New(
string(batch.Failed),
string(batch.TerminatingOrFailed))
)
// validateGeneratedSelector validates that the generated selector on a controller object match the controller object
@@ -244,6 +248,8 @@ func validateJobSpec(spec *batch.JobSpec, fldPath *field.Path, opts apivalidatio
allErrs = append(allErrs, validatePodFailurePolicy(spec, fldPath.Child("podFailurePolicy"))...)
}
allErrs = append(allErrs, validatePodReplacementPolicy(spec, fldPath.Child("podReplacementPolicy"))...)
allErrs = append(allErrs, apivalidation.ValidatePodTemplateSpec(&spec.Template, fldPath.Child("template"), opts)...)
// spec.Template.Spec.RestartPolicy can be defaulted as RestartPolicyAlways
@@ -281,6 +287,22 @@ func validatePodFailurePolicy(spec *batch.JobSpec, fldPath *field.Path) field.Er
return allErrs
}
func validatePodReplacementPolicy(spec *batch.JobSpec, fldPath *field.Path) field.ErrorList {
var allErrs field.ErrorList
if spec.PodReplacementPolicy != nil {
// If PodFailurePolicy is specified then we only allow Failed.
if spec.PodFailurePolicy != nil {
if *spec.PodReplacementPolicy != batch.Failed {
allErrs = append(allErrs, field.NotSupported(fldPath, *spec.PodReplacementPolicy, []string{string(batch.Failed)}))
}
// If PodFailurePolicy not specified we allow values in supportedPodRecreationPolicy.
} else if !supportedPodRecreationPolicy.Has(string(*spec.PodReplacementPolicy)) {
allErrs = append(allErrs, field.NotSupported(fldPath, *spec.PodReplacementPolicy, sets.List(supportedPodRecreationPolicy)))
}
}
return allErrs
}
func validatePodFailurePolicyRule(spec *batch.JobSpec, rule *batch.PodFailurePolicyRule, rulePath *field.Path, containerNames sets.String) field.ErrorList {
var allErrs field.ErrorList
actionPath := rulePath.Child("action")
@@ -375,6 +397,9 @@ func validateJobStatus(status *batch.JobStatus, fldPath *field.Path) field.Error
if status.Ready != nil {
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*status.Ready), fldPath.Child("ready"))...)
}
if status.Terminating != nil {
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*status.Terminating), fldPath.Child("terminating"))...)
}
if status.UncountedTerminatedPods != nil {
path := fldPath.Child("uncountedTerminatedPods")
seen := sets.NewString()

View File

@@ -94,6 +94,8 @@ func TestValidateJob(t *testing.T) {
UID: types.UID("1a2b3c"),
}
validManualSelector := getValidManualSelector()
failedPodReplacement := batch.Failed
terminatingOrFailedPodReplacement := batch.TerminatingOrFailed
validPodTemplateSpecForManual := getValidPodTemplateSpecForManual(validManualSelector)
validGeneratedSelector := getValidGeneratedSelector()
validPodTemplateSpecForGenerated := getValidPodTemplateSpecForGenerated(validGeneratedSelector)
@@ -212,6 +214,36 @@ func TestValidateJob(t *testing.T) {
},
},
},
"valid pod replacement": {
opts: JobValidationOptions{RequirePrefixedLabels: true},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
UID: types.UID("1a2b3c"),
},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
PodReplacementPolicy: &terminatingOrFailedPodReplacement,
},
},
},
"valid pod replacement with failed": {
opts: JobValidationOptions{RequirePrefixedLabels: true},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
UID: types.UID("1a2b3c"),
},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
PodReplacementPolicy: &failedPodReplacement,
},
},
},
"valid hostnet": {
opts: JobValidationOptions{RequirePrefixedLabels: true},
job: batch.Job{
@@ -749,6 +781,38 @@ func TestValidateJob(t *testing.T) {
},
opts: JobValidationOptions{RequirePrefixedLabels: true},
},
`spec.podReplacementPolicy: Unsupported value: "TerminatingOrFailed": supported values: "Failed"`: {
job: batch.Job{
ObjectMeta: validJobObjectMeta,
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
PodReplacementPolicy: &terminatingOrFailedPodReplacement,
Template: validPodTemplateSpecForGeneratedRestartPolicyNever,
PodFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{{
Action: batch.PodFailurePolicyActionIgnore,
OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{
Type: api.DisruptionTarget,
Status: api.ConditionTrue,
}},
},
},
},
},
},
opts: JobValidationOptions{RequirePrefixedLabels: true},
},
`spec.podReplacementPolicy: Unsupported value: "": supported values: "Failed", "TerminatingOrFailed"`: {
job: batch.Job{
ObjectMeta: validJobObjectMeta,
Spec: batch.JobSpec{
PodReplacementPolicy: (*batch.PodReplacementPolicy)(pointer.String("")),
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGeneratedRestartPolicyNever,
},
},
opts: JobValidationOptions{RequirePrefixedLabels: true},
},
`spec.template.spec.restartPolicy: Invalid value: "OnFailure": only "Never" is supported when podFailurePolicy is specified`: {
job: batch.Job{
ObjectMeta: validJobObjectMeta,
@@ -1962,9 +2026,10 @@ func TestValidateJobUpdateStatus(t *testing.T) {
ResourceVersion: "1",
},
Status: batch.JobStatus{
Active: 1,
Succeeded: 2,
Failed: 3,
Active: 1,
Succeeded: 2,
Failed: 3,
Terminating: pointer.Int32(4),
},
},
update: batch.Job{
@@ -1974,14 +2039,15 @@ func TestValidateJobUpdateStatus(t *testing.T) {
ResourceVersion: "1",
},
Status: batch.JobStatus{
Active: 2,
Succeeded: 3,
Failed: 4,
Ready: pointer.Int32(1),
Active: 2,
Succeeded: 3,
Failed: 4,
Ready: pointer.Int32(1),
Terminating: pointer.Int32(4),
},
},
},
"nil ready": {
"nil ready and terminating": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "abc",
@@ -2015,9 +2081,10 @@ func TestValidateJobUpdateStatus(t *testing.T) {
ResourceVersion: "10",
},
Status: batch.JobStatus{
Active: 1,
Succeeded: 2,
Failed: 3,
Active: 1,
Succeeded: 2,
Failed: 3,
Terminating: pointer.Int32(4),
},
},
update: batch.Job{
@@ -2027,10 +2094,11 @@ func TestValidateJobUpdateStatus(t *testing.T) {
ResourceVersion: "10",
},
Status: batch.JobStatus{
Active: -1,
Succeeded: -2,
Failed: -3,
Ready: pointer.Int32(-1),
Active: -1,
Succeeded: -2,
Failed: -3,
Ready: pointer.Int32(-1),
Terminating: pointer.Int32(-2),
},
},
wantErrs: field.ErrorList{
@@ -2038,6 +2106,7 @@ func TestValidateJobUpdateStatus(t *testing.T) {
{Type: field.ErrorTypeInvalid, Field: "status.succeeded"},
{Type: field.ErrorTypeInvalid, Field: "status.failed"},
{Type: field.ErrorTypeInvalid, Field: "status.ready"},
{Type: field.ErrorTypeInvalid, Field: "status.terminating"},
},
},
"empty and duplicated uncounted pods": {

View File

@@ -303,6 +303,11 @@ func (in *JobSpec) DeepCopyInto(out *JobSpec) {
*out = new(bool)
**out = **in
}
if in.PodReplacementPolicy != nil {
in, out := &in.PodReplacementPolicy, &out.PodReplacementPolicy
*out = new(PodReplacementPolicy)
**out = **in
}
return
}
@@ -334,6 +339,11 @@ func (in *JobStatus) DeepCopyInto(out *JobStatus) {
in, out := &in.CompletionTime, &out.CompletionTime
*out = (*in).DeepCopy()
}
if in.Terminating != nil {
in, out := &in.Terminating, &out.Terminating
*out = new(int32)
**out = **in
}
if in.Ready != nil {
in, out := &in.Ready, &out.Ready
*out = new(int32)