Delay setting terminal Job conditions until all pods are terminal

Fix the integration test typecheck

Fix after rebase

# Conflicts:
#	pkg/controller/job/job_controller_test.go
This commit is contained in:
Michal Wozniak 2024-07-11 20:54:09 +02:00
parent e5ff4b8fcd
commit fb7704ba03
8 changed files with 1177 additions and 59 deletions

View File

@ -568,7 +568,7 @@ func validateJobStatus(job *batch.Job, fldPath *field.Path, opts JobStatusValida
} }
} }
} }
if ptr.Deref(job.Spec.CompletionMode, batch.NonIndexedCompletion) != batch.IndexedCompletion && isJobSuccessCriteriaMet(job) { if !opts.AllowForSuccessCriteriaMetInExtendedScope && ptr.Deref(job.Spec.CompletionMode, batch.NonIndexedCompletion) != batch.IndexedCompletion && isJobSuccessCriteriaMet(job) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet to NonIndexed Job")) allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet to NonIndexed Job"))
} }
if isJobSuccessCriteriaMet(job) && IsJobFailed(job) { if isJobSuccessCriteriaMet(job) && IsJobFailed(job) {
@ -577,7 +577,7 @@ func validateJobStatus(job *batch.Job, fldPath *field.Path, opts JobStatusValida
if isJobSuccessCriteriaMet(job) && isJobFailureTarget(job) { if isJobSuccessCriteriaMet(job) && isJobFailureTarget(job) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet=True and FailureTarget=true conditions")) allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet=True and FailureTarget=true conditions"))
} }
if job.Spec.SuccessPolicy == nil && isJobSuccessCriteriaMet(job) { if !opts.AllowForSuccessCriteriaMetInExtendedScope && job.Spec.SuccessPolicy == nil && isJobSuccessCriteriaMet(job) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet=True for Job without SuccessPolicy")) allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet=True for Job without SuccessPolicy"))
} }
if job.Spec.SuccessPolicy != nil && !isJobSuccessCriteriaMet(job) && IsJobComplete(job) { if job.Spec.SuccessPolicy != nil && !isJobSuccessCriteriaMet(job) && IsJobComplete(job) {
@ -1015,4 +1015,5 @@ type JobStatusValidationOptions struct {
RejectNotCompleteJobWithCompletionTime bool RejectNotCompleteJobWithCompletionTime bool
RejectCompleteJobWithFailedCondition bool RejectCompleteJobWithFailedCondition bool
RejectCompleteJobWithFailureTargetCondition bool RejectCompleteJobWithFailureTargetCondition bool
AllowForSuccessCriteriaMetInExtendedScope bool
} }

View File

@ -844,7 +844,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
// Given that the Job already has the SuccessCriteriaMet condition, the termination condition already had confirmed in another cycle. // Given that the Job already has the SuccessCriteriaMet condition, the termination condition already had confirmed in another cycle.
// So, the job-controller evaluates the podFailurePolicy only when the Job doesn't have the SuccessCriteriaMet condition. // So, the job-controller evaluates the podFailurePolicy only when the Job doesn't have the SuccessCriteriaMet condition.
if jobCtx.finishedCondition == nil && feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { if jobCtx.finishedCondition == nil && (feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) || delayTerminalCondition()) {
failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget) failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget)
if failureTargetCondition != nil && failureTargetCondition.Status == v1.ConditionTrue { if failureTargetCondition != nil && failureTargetCondition.Status == v1.ConditionTrue {
jobCtx.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now()) jobCtx.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
@ -857,9 +857,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) { if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// check if the number of pod restart exceeds backoff (for restart OnFailure only) // check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob // OR if the number of failed jobs increased since the last syncJob
jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonBackoffLimitExceeded, "Job has reached the specified backoff limit", jm.clock.Now()) jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonBackoffLimitExceeded, "Job has reached the specified backoff limit")
} else if jm.pastActiveDeadline(&job) { } else if jm.pastActiveDeadline(&job) {
jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonDeadlineExceeded, "Job was active longer than specified deadline", jm.clock.Now()) jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonDeadlineExceeded, "Job was active longer than specified deadline")
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) { } else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time) syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration) logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
@ -874,9 +874,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
jobCtx.failedIndexes = calculateFailedIndexes(logger, &job, pods) jobCtx.failedIndexes = calculateFailedIndexes(logger, &job, pods)
if jobCtx.finishedCondition == nil { if jobCtx.finishedCondition == nil {
if job.Spec.MaxFailedIndexes != nil && jobCtx.failedIndexes.total() > int(*job.Spec.MaxFailedIndexes) { if job.Spec.MaxFailedIndexes != nil && jobCtx.failedIndexes.total() > int(*job.Spec.MaxFailedIndexes) {
jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonMaxFailedIndexesExceeded, "Job has exceeded the specified maximal number of failed indexes", jm.clock.Now()) jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonMaxFailedIndexesExceeded, "Job has exceeded the specified maximal number of failed indexes")
} else if jobCtx.failedIndexes.total() > 0 && jobCtx.failedIndexes.total()+jobCtx.succeededIndexes.total() >= int(*job.Spec.Completions) { } else if jobCtx.failedIndexes.total() > 0 && jobCtx.failedIndexes.total()+jobCtx.succeededIndexes.total() >= int(*job.Spec.Completions) {
jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonFailedIndexes, "Job has failed indexes", jm.clock.Now()) jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonFailedIndexes, "Job has failed indexes")
} }
} }
jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx) jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx)
@ -925,7 +925,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
complete = jobCtx.succeeded >= *job.Spec.Completions && active == 0 complete = jobCtx.succeeded >= *job.Spec.Completions && active == 0
} }
if complete { if complete {
jobCtx.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now()) jobCtx.finishedCondition = jm.newSuccessCondition()
} else if manageJobCalled { } else if manageJobCalled {
// Update the conditions / emit events only if manageJob was called in // Update the conditions / emit events only if manageJob was called in
// this syncJob. Otherwise wait for the right syncJob call to make // this syncJob. Otherwise wait for the right syncJob call to make
@ -975,6 +975,27 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
return manageJobErr return manageJobErr
} }
func (jm *Controller) newFailureCondition(reason, message string) *batch.JobCondition {
cType := batch.JobFailed
if delayTerminalCondition() {
cType = batch.JobFailureTarget
}
return newCondition(cType, v1.ConditionTrue, reason, message, jm.clock.Now())
}
func (jm *Controller) newSuccessCondition() *batch.JobCondition {
cType := batch.JobComplete
if delayTerminalCondition() {
cType = batch.JobSuccessCriteriaMet
}
return newCondition(cType, v1.ConditionTrue, "", "", jm.clock.Now())
}
func delayTerminalCondition() bool {
return feature.DefaultFeatureGate.Enabled(features.JobManagedBy) ||
feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy)
}
// deleteActivePods issues deletion for active Pods, preserving finalizers. // deleteActivePods issues deletion for active Pods, preserving finalizers.
// This is done through DELETE calls that set deletion timestamps. // This is done through DELETE calls that set deletion timestamps.
// The method trackJobStatusAndRemoveFinalizers removes the finalizers, after // The method trackJobStatusAndRemoveFinalizers removes the finalizers, after
@ -1165,7 +1186,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
needsFlush = true needsFlush = true
} }
} }
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) || delayTerminalCondition() {
if jobCtx.finishedCondition != nil && jobCtx.finishedCondition.Type == batch.JobFailureTarget { if jobCtx.finishedCondition != nil && jobCtx.finishedCondition.Type == batch.JobFailureTarget {
// Append the interim FailureTarget condition to update the job status with before finalizers are removed. // Append the interim FailureTarget condition to update the job status with before finalizers are removed.
@ -1378,6 +1399,12 @@ func (jm *Controller) enactJobFinished(logger klog.Logger, jobCtx *syncJobCtx) b
return false return false
} }
} }
if delayTerminalCondition() {
if *jobCtx.terminating > 0 {
logger.V(4).Info("Delaying marking the Job as finished, because there are still terminating pod(s)", "job", klog.KObj(job), "condition", jobCtx.finishedCondition.Type, "count", *jobCtx.terminating)
return false
}
}
finishedCond := jobCtx.finishedCondition finishedCond := jobCtx.finishedCondition
job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message, jm.clock.Now()) job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message, jm.clock.Now())
if finishedCond.Type == batch.JobComplete { if finishedCond.Type == batch.JobComplete {
@ -1964,12 +1991,17 @@ func countReadyPods(pods []*v1.Pod) int32 {
// trackTerminatingPods checks if the count of terminating pods is tracked. // trackTerminatingPods checks if the count of terminating pods is tracked.
// They are tracked when any the following is true: // They are tracked when any the following is true:
// - JobPodReplacementPolicy is enabled to be returned in the status field, // - JobPodReplacementPolicy is enabled to be returned in the status field;
// and to delay setting the Job terminal condition,
// - JobManagedBy is enabled to delay setting Job terminal condition,
// - only failed pods are replaced, because pod failure policy is used // - only failed pods are replaced, because pod failure policy is used
func trackTerminatingPods(job *batch.Job) bool { func trackTerminatingPods(job *batch.Job) bool {
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
return true return true
} }
if feature.DefaultFeatureGate.Enabled(features.JobManagedBy) {
return true
}
return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
} }

View File

@ -292,6 +292,7 @@ func TestControllerSyncJob(t *testing.T) {
jobPodReplacementPolicy bool jobPodReplacementPolicy bool
jobPodFailurePolicy bool jobPodFailurePolicy bool
jobSuccessPolicy bool jobSuccessPolicy bool
jobManagedBy bool
}{ }{
"job start": { "job start": {
parallelism: 2, parallelism: 2,
@ -827,7 +828,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCompletedIdxs: "0", expectedCompletedIdxs: "0",
expectedConditions: []batch.JobCondition{ expectedConditions: []batch.JobCondition{
{ {
Type: batch.JobFailed, Type: batch.JobFailureTarget,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,
Reason: batch.JobReasonBackoffLimitExceeded, Reason: batch.JobReasonBackoffLimitExceeded,
Message: "Job has reached the specified backoff limit", Message: "Job has reached the specified backoff limit",
@ -1220,6 +1221,32 @@ func TestControllerSyncJob(t *testing.T) {
}, },
}, },
}, },
"backoff limit exceeded; JobManagedBy enabled": {
jobManagedBy: true,
parallelism: 2,
completions: 3,
backoffLimit: 0,
completionMode: batch.IndexedCompletion,
podsWithIndexes: []indexPhase{
{"0", v1.PodSucceeded},
{"1", v1.PodFailed},
{"2", v1.PodRunning},
},
expectedSucceeded: 1,
expectedFailed: 2,
expectedCompletedIdxs: "0",
expectedConditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonBackoffLimitExceeded,
Message: "Job has reached the specified backoff limit",
},
},
expectedPodPatches: 3,
expectedReady: ptr.To[int32](0),
expectedDeletions: 1,
},
} }
for name, tc := range testCases { for name, tc := range testCases {
@ -1229,6 +1256,7 @@ func TestControllerSyncJob(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.jobPodReplacementPolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.jobPodReplacementPolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.jobPodFailurePolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.jobPodFailurePolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.jobSuccessPolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.jobSuccessPolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.jobManagedBy)
// job manager setup // job manager setup
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
@ -1546,6 +1574,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
completedCond := newCondition(batch.JobComplete, v1.ConditionTrue, "", "", now) completedCond := newCondition(batch.JobComplete, v1.ConditionTrue, "", "", now)
succeededCond := newCondition(batch.JobSuccessCriteriaMet, v1.ConditionTrue, "", "", minuteAgo) succeededCond := newCondition(batch.JobSuccessCriteriaMet, v1.ConditionTrue, "", "", minuteAgo)
failedCond := newCondition(batch.JobFailed, v1.ConditionTrue, "", "", now) failedCond := newCondition(batch.JobFailed, v1.ConditionTrue, "", "", now)
failureTargetCond := newCondition(batch.JobFailureTarget, v1.ConditionTrue, "", "", now)
indexedCompletion := batch.IndexedCompletion indexedCompletion := batch.IndexedCompletion
mockErr := errors.New("mock error") mockErr := errors.New("mock error")
cases := map[string]struct { cases := map[string]struct {
@ -1565,6 +1594,8 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
// features // features
enableJobBackoffLimitPerIndex bool enableJobBackoffLimitPerIndex bool
enableJobSuccessPolicy bool enableJobSuccessPolicy bool
enableJobPodReplacementPolicy bool
enableJobManagedBy bool
}{ }{
"no updates": {}, "no updates": {},
"new active": { "new active": {
@ -2114,6 +2145,215 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
}, },
wantFailedPodsMetric: 2, wantFailedPodsMetric: 2,
}, },
"pod is terminal; JobFailed condition is set": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: ptr.To[int32](1),
Parallelism: ptr.To[int32](1),
},
Status: batch.JobStatus{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Failed: []types.UID{"a"},
},
},
},
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().Pod,
},
finishedCond: failedCond,
wantRmFinalizers: 1,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Failed: 1,
Conditions: []batch.JobCondition{*failedCond},
},
},
wantFailedPodsMetric: 1,
},
"pod is terminating; counted as failed, but the JobFailed condition is delayed; JobPodReplacementPolicy enabled": {
enableJobPodReplacementPolicy: true,
job: batch.Job{
Spec: batch.JobSpec{
Completions: ptr.To[int32](1),
Parallelism: ptr.To[int32](1),
},
Status: batch.JobStatus{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Failed: []types.UID{"a"},
},
Conditions: []batch.JobCondition{*failureTargetCond},
},
},
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodRunning).deletionTimestamp().Pod,
},
finishedCond: failedCond,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Failed: 1,
Conditions: []batch.JobCondition{*failureTargetCond},
},
},
wantFailedPodsMetric: 1,
},
"pod is terminating; counted as failed, but the JobFailed condition is delayed; JobManagedBy enabled": {
enableJobManagedBy: true,
job: batch.Job{
Spec: batch.JobSpec{
Completions: ptr.To[int32](1),
Parallelism: ptr.To[int32](1),
},
Status: batch.JobStatus{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Failed: []types.UID{"a"},
},
Conditions: []batch.JobCondition{*failureTargetCond},
},
},
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodRunning).deletionTimestamp().Pod,
},
finishedCond: failedCond,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Failed: 1,
Conditions: []batch.JobCondition{*failureTargetCond},
},
},
wantFailedPodsMetric: 1,
},
"pod is terminating; counted as failed, JobFailed condition is not delayed; JobPodReplacementPolicy and JobManagedBy disabled": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: ptr.To[int32](1),
Parallelism: ptr.To[int32](1),
},
Status: batch.JobStatus{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Failed: []types.UID{"a"},
},
Conditions: []batch.JobCondition{*failureTargetCond},
},
},
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodRunning).deletionTimestamp().Pod,
},
finishedCond: failedCond,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Failed: 1,
Conditions: []batch.JobCondition{*failureTargetCond},
},
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Failed: 1,
Conditions: []batch.JobCondition{*failureTargetCond, *failedCond},
},
},
wantFailedPodsMetric: 1,
},
"pod is terminating; JobSuccessCriteriaMet, but JobComplete condition is delayed; JobPodReplacementPolicy enabled": {
enableJobPodReplacementPolicy: true,
job: batch.Job{
Spec: batch.JobSpec{
Completions: ptr.To[int32](1),
Parallelism: ptr.To[int32](1),
},
Status: batch.JobStatus{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Failed: []types.UID{"a"},
Succeeded: []types.UID{"b"},
},
Conditions: []batch.JobCondition{*succeededCond},
},
},
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodRunning).deletionTimestamp().Pod,
},
finishedCond: completedCond,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 1,
Failed: 1,
Conditions: []batch.JobCondition{*succeededCond},
},
},
wantFailedPodsMetric: 1,
wantSucceededPodsMetric: 1,
},
"pod is terminating; JobSuccessCriteriaMet, but JobComplete condition is delayed; JobManagedBy enabled": {
enableJobManagedBy: true,
job: batch.Job{
Spec: batch.JobSpec{
Completions: ptr.To[int32](1),
Parallelism: ptr.To[int32](1),
},
Status: batch.JobStatus{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Failed: []types.UID{"a"},
Succeeded: []types.UID{"b"},
},
Conditions: []batch.JobCondition{*succeededCond},
},
},
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodRunning).deletionTimestamp().Pod,
},
finishedCond: completedCond,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 1,
Failed: 1,
Conditions: []batch.JobCondition{*succeededCond},
},
},
wantFailedPodsMetric: 1,
wantSucceededPodsMetric: 1,
},
"pod is terminating; JobSuccessCriteriaMet, JobComplete condition is not delayed; JobPodReplacementPolicy and JobManagedBy disabled": {
enableJobSuccessPolicy: true,
job: batch.Job{
Spec: batch.JobSpec{
Completions: ptr.To[int32](1),
Parallelism: ptr.To[int32](1),
},
Status: batch.JobStatus{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Failed: []types.UID{"a"},
Succeeded: []types.UID{"b"},
},
Conditions: []batch.JobCondition{*succeededCond},
},
},
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodRunning).deletionTimestamp().Pod,
},
finishedCond: completedCond,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 1,
Failed: 1,
Conditions: []batch.JobCondition{*succeededCond},
},
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 1,
Failed: 1,
Conditions: []batch.JobCondition{*succeededCond, *completedCond},
CompletionTime: ptr.To(metav1.NewTime(now)),
},
},
wantFailedPodsMetric: 1,
wantSucceededPodsMetric: 1,
},
"indexed job with a failed pod with delayed finalizer removal; the pod is not counted": { "indexed job with a failed pod with delayed finalizer removal; the pod is not counted": {
enableJobBackoffLimitPerIndex: true, enableJobBackoffLimitPerIndex: true,
job: batch.Job{ job: batch.Job{
@ -2200,6 +2440,8 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy)
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, _ := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock) manager, _ := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock)
@ -2222,14 +2464,15 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
expectedRmFinalizers: tc.expectedRmFinalizers, expectedRmFinalizers: tc.expectedRmFinalizers,
finishedCondition: tc.finishedCond, finishedCondition: tc.finishedCond,
} }
jobCtx.activePods = controller.FilterActivePods(logger, tc.pods)
if isIndexedJob(job) { if isIndexedJob(job) {
jobCtx.succeededIndexes = parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) jobCtx.succeededIndexes = parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
if tc.enableJobBackoffLimitPerIndex && job.Spec.BackoffLimitPerIndex != nil { if tc.enableJobBackoffLimitPerIndex && job.Spec.BackoffLimitPerIndex != nil {
jobCtx.failedIndexes = calculateFailedIndexes(logger, job, tc.pods) jobCtx.failedIndexes = calculateFailedIndexes(logger, job, tc.pods)
jobCtx.activePods = controller.FilterActivePods(logger, tc.pods)
jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx) jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx)
} }
} }
jobCtx.terminating = ptr.To(controller.CountTerminatingPods(tc.pods))
err := manager.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, tc.needsFlush) err := manager.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, tc.needsFlush)
if !errors.Is(err, tc.wantErr) { if !errors.Is(err, tc.wantErr) {
@ -2287,6 +2530,10 @@ func TestSyncJobPastDeadline(t *testing.T) {
expectedSucceeded int32 expectedSucceeded int32
expectedFailed int32 expectedFailed int32
expectedConditions []batch.JobCondition expectedConditions []batch.JobCondition
// features
enableJobPodReplacementPolicy bool
enableJobManagedBy bool
}{ }{
"activeDeadlineSeconds less than single pod execution": { "activeDeadlineSeconds less than single pod execution": {
parallelism: 1, parallelism: 1,
@ -2373,10 +2620,79 @@ func TestSyncJobPastDeadline(t *testing.T) {
}, },
}, },
}, },
"activeDeadlineSeconds exceeded; JobPodReplacementPolicy enabled": {
enableJobPodReplacementPolicy: true,
parallelism: 1,
completions: 2,
activeDeadlineSeconds: 10,
startTime: 15,
backoffLimit: 6,
activePods: 1,
succeededPods: 1,
expectedDeletions: 1,
expectedSucceeded: 1,
expectedFailed: 1,
expectedConditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonDeadlineExceeded,
Message: "Job was active longer than specified deadline",
},
},
},
"activeDeadlineSeconds exceeded; JobManagedBy enabled": {
enableJobManagedBy: true,
parallelism: 1,
completions: 2,
activeDeadlineSeconds: 10,
startTime: 15,
backoffLimit: 6,
activePods: 1,
succeededPods: 1,
expectedDeletions: 1,
expectedSucceeded: 1,
expectedFailed: 1,
expectedConditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonDeadlineExceeded,
Message: "Job was active longer than specified deadline",
},
},
},
"activeDeadlineSeconds exceeded and backofflimit reached; JobManagedBy enabled": {
enableJobManagedBy: true,
parallelism: 1,
completions: 1,
activeDeadlineSeconds: 1,
startTime: 10,
failedPods: 1,
expectedFailed: 1,
expectedConditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonBackoffLimitExceeded,
Message: "Job has reached the specified backoff limit",
},
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
Reason: batch.JobReasonBackoffLimitExceeded,
Message: "Job has reached the specified backoff limit",
},
},
},
} }
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy)
// job manager setup // job manager setup
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientSet, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientSet, controller.NoResyncPeriodFunc)
@ -3894,6 +4210,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
enableBackoffLimitPerIndex bool enableBackoffLimitPerIndex bool
enableJobSuccessPolicy bool enableJobSuccessPolicy bool
enableJobPodReplacementPolicy bool enableJobPodReplacementPolicy bool
enableJobManagedBy bool
job batch.Job job batch.Job
pods []v1.Pod pods []v1.Pod
wantStatus batch.JobStatus wantStatus batch.JobStatus
@ -3938,12 +4255,6 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
Reason: batch.JobReasonSuccessPolicy, Reason: batch.JobReasonSuccessPolicy,
Message: "Matched rules at index 0", Message: "Matched rules at index 0",
}, },
{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
Reason: batch.JobReasonSuccessPolicy,
Message: "Matched rules at index 0",
},
}, },
}, },
}, },
@ -4044,12 +4355,6 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
Reason: batch.JobReasonSuccessPolicy, Reason: batch.JobReasonSuccessPolicy,
Message: "Matched rules at index 0", Message: "Matched rules at index 0",
}, },
{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
Reason: batch.JobReasonSuccessPolicy,
Message: "Matched rules at index 0",
},
}, },
}, },
}, },
@ -4152,12 +4457,6 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
Reason: batch.JobReasonSuccessPolicy, Reason: batch.JobReasonSuccessPolicy,
Message: "Matched rules at index 0", Message: "Matched rules at index 0",
}, },
{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
Reason: batch.JobReasonSuccessPolicy,
Message: "Matched rules at index 0",
},
}, },
}, },
}, },
@ -4219,12 +4518,8 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
}, },
}, },
}, },
// In the current mechanism, the job controller adds Complete condition to Job
// even if some running pods still remain.
// So, we need to revisit here before we graduate the JobSuccessPolicy to beta.
// TODO(#123775): A Job might finish with ready!=0
// REF: https://github.com/kubernetes/kubernetes/issues/123775 // REF: https://github.com/kubernetes/kubernetes/issues/123775
"job with successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet and Complete condition when job meets to successPolicy and some pods still are running": { "job with successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet condition when job meets to successPolicy and some pods still are running": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
enableJobPodReplacementPolicy: true, enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
@ -4275,8 +4570,58 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
Reason: batch.JobReasonSuccessPolicy, Reason: batch.JobReasonSuccessPolicy,
Message: "Matched rules at index 0", Message: "Matched rules at index 0",
}, },
},
},
},
// REF: https://github.com/kubernetes/kubernetes/issues/123775
"job with successPolicy; JobManagedBy feature enabled; job has SuccessCriteriaMet condition when job meets to successPolicy and some pods still are running": {
enableJobSuccessPolicy: true,
enableJobPodReplacementPolicy: false,
enableJobManagedBy: true,
job: batch.Job{
TypeMeta: validTypeMeta,
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
CompletionMode: ptr.To(batch.IndexedCompletion),
Parallelism: ptr.To[int32](3),
Completions: ptr.To[int32](3),
BackoffLimit: ptr.To[int32](math.MaxInt32),
BackoffLimitPerIndex: ptr.To[int32](3),
SuccessPolicy: &batch.SuccessPolicy{
Rules: []batch.SuccessPolicyRule{{
SucceededIndexes: ptr.To("0,1"),
SucceededCount: ptr.To[int32](1),
}},
},
},
Status: batch.JobStatus{
Conditions: []batch.JobCondition{
{ {
Type: batch.JobComplete, Type: batch.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
Reason: batch.JobReasonSuccessPolicy,
Message: "Matched rules at index 0",
},
},
},
},
pods: []v1.Pod{
*buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod,
*buildPod().uid("a2").index("1").phase(v1.PodRunning).trackingFinalizer().Pod,
*buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
*buildPod().uid("c").index("2").phase(v1.PodRunning).trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Failed: 1,
Succeeded: 1,
Terminating: nil,
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
{
Type: batch.JobSuccessCriteriaMet,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,
Reason: batch.JobReasonSuccessPolicy, Reason: batch.JobReasonSuccessPolicy,
Message: "Matched rules at index 0", Message: "Matched rules at index 0",
@ -4339,12 +4684,6 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
Reason: batch.JobReasonPodFailurePolicy, Reason: batch.JobReasonPodFailurePolicy,
Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0",
}, },
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
Reason: batch.JobReasonPodFailurePolicy,
Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0",
},
}, },
}, },
}, },
@ -4383,6 +4722,12 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
FailedIndexes: ptr.To("0"), FailedIndexes: ptr.To("0"),
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{ Conditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonFailedIndexes,
Message: "Job has failed indexes",
},
{ {
Type: batch.JobFailed, Type: batch.JobFailed,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,
@ -4425,6 +4770,12 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
CompletedIndexes: "1", CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{ Conditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonBackoffLimitExceeded,
Message: "Job has reached the specified backoff limit",
},
{ {
Type: batch.JobFailed, Type: batch.JobFailed,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,
@ -4753,6 +5104,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy)
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
fakeClock := clocktesting.NewFakeClock(now) fakeClock := clocktesting.NewFakeClock(now)
@ -4821,12 +5173,15 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
enableJobBackoffLimitPerIndex bool enableJobBackoffLimitPerIndex bool
enableJobPodFailurePolicy bool enableJobPodFailurePolicy bool
enableJobPodReplacementPolicy bool
enableJobManagedBy bool
job batch.Job job batch.Job
pods []v1.Pod pods []v1.Pod
wantStatus batch.JobStatus wantStatus batch.JobStatus
}{ }{
"successful job after a single failure within index": { "successful job after a single failure within index": {
enableJobBackoffLimitPerIndex: true, enableJobBackoffLimitPerIndex: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"}, TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@ -4853,6 +5208,10 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
FailedIndexes: ptr.To(""), FailedIndexes: ptr.To(""),
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{ Conditions: []batch.JobCondition{
{
Type: batch.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
},
{ {
Type: batch.JobComplete, Type: batch.JobComplete,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,
@ -4862,6 +5221,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
}, },
"single failed pod, not counted as the replacement pod creation is delayed": { "single failed pod, not counted as the replacement pod creation is delayed": {
enableJobBackoffLimitPerIndex: true, enableJobBackoffLimitPerIndex: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"}, TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@ -4887,6 +5247,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
}, },
"single failed pod replaced already": { "single failed pod replaced already": {
enableJobBackoffLimitPerIndex: true, enableJobBackoffLimitPerIndex: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"}, TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@ -4914,6 +5275,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
}, },
"single failed index due to exceeding the backoff limit per index, the job continues": { "single failed index due to exceeding the backoff limit per index, the job continues": {
enableJobBackoffLimitPerIndex: true, enableJobBackoffLimitPerIndex: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"}, TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@ -4941,6 +5303,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
"single failed index due to FailIndex action, the job continues": { "single failed index due to FailIndex action, the job continues": {
enableJobBackoffLimitPerIndex: true, enableJobBackoffLimitPerIndex: true,
enableJobPodFailurePolicy: true, enableJobPodFailurePolicy: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"}, TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@ -4990,6 +5353,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
"job failed index due to FailJob action": { "job failed index due to FailJob action": {
enableJobBackoffLimitPerIndex: true, enableJobBackoffLimitPerIndex: true,
enableJobPodFailurePolicy: true, enableJobPodFailurePolicy: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"}, TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@ -5054,6 +5418,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
"job pod failure ignored due to matching Ignore action": { "job pod failure ignored due to matching Ignore action": {
enableJobBackoffLimitPerIndex: true, enableJobBackoffLimitPerIndex: true,
enableJobPodFailurePolicy: true, enableJobPodFailurePolicy: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"}, TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@ -5103,6 +5468,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
}, },
"job failed due to exceeding backoffLimit before backoffLimitPerIndex": { "job failed due to exceeding backoffLimit before backoffLimitPerIndex": {
enableJobBackoffLimitPerIndex: true, enableJobBackoffLimitPerIndex: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"}, TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@ -5127,6 +5493,12 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
Terminating: ptr.To[int32](0), Terminating: ptr.To[int32](0),
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{ Conditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonBackoffLimitExceeded,
Message: "Job has reached the specified backoff limit",
},
{ {
Type: batch.JobFailed, Type: batch.JobFailed,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,
@ -5138,6 +5510,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
}, },
"job failed due to failed indexes": { "job failed due to failed indexes": {
enableJobBackoffLimitPerIndex: true, enableJobBackoffLimitPerIndex: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"}, TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@ -5163,6 +5536,12 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
CompletedIndexes: "1", CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{ Conditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonFailedIndexes,
Message: "Job has failed indexes",
},
{ {
Type: batch.JobFailed, Type: batch.JobFailed,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,
@ -5174,6 +5553,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
}, },
"job failed due to exceeding max failed indexes": { "job failed due to exceeding max failed indexes": {
enableJobBackoffLimitPerIndex: true, enableJobBackoffLimitPerIndex: true,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"}, TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@ -5203,7 +5583,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{ Conditions: []batch.JobCondition{
{ {
Type: batch.JobFailed, Type: batch.JobFailureTarget,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,
Reason: batch.JobReasonMaxFailedIndexesExceeded, Reason: batch.JobReasonMaxFailedIndexesExceeded,
Message: "Job has exceeded the specified maximal number of failed indexes", Message: "Job has exceeded the specified maximal number of failed indexes",
@ -5213,6 +5593,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
}, },
"job with finished indexes; failedIndexes are cleaned when JobBackoffLimitPerIndex disabled": { "job with finished indexes; failedIndexes are cleaned when JobBackoffLimitPerIndex disabled": {
enableJobBackoffLimitPerIndex: false, enableJobBackoffLimitPerIndex: false,
enableJobPodReplacementPolicy: true,
job: batch.Job{ job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"}, TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta, ObjectMeta: validObjectMeta,
@ -5241,11 +5622,167 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
}, },
}, },
"job failed due to failed indexes; JobPodReplacementPolicy and JobManagedBy disabled": {
enableJobBackoffLimitPerIndex: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
BackoffLimit: ptr.To[int32](math.MaxInt32),
CompletionMode: ptr.To(batch.IndexedCompletion),
BackoffLimitPerIndex: ptr.To[int32](1),
},
},
pods: []v1.Pod{
*buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
*buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Failed: 1,
Succeeded: 1,
FailedIndexes: ptr.To("0"),
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
Reason: batch.JobReasonFailedIndexes,
Message: "Job has failed indexes",
},
},
},
},
"job failed due to failed indexes; JobManagedBy enabled": {
enableJobBackoffLimitPerIndex: true,
enableJobManagedBy: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
BackoffLimit: ptr.To[int32](math.MaxInt32),
CompletionMode: ptr.To(batch.IndexedCompletion),
BackoffLimitPerIndex: ptr.To[int32](1),
},
},
pods: []v1.Pod{
*buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
*buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Failed: 1,
Succeeded: 1,
FailedIndexes: ptr.To("0"),
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonFailedIndexes,
Message: "Job has failed indexes",
},
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
Reason: batch.JobReasonFailedIndexes,
Message: "Job has failed indexes",
},
},
},
},
"job failed due to exceeding max failed indexes; JobPodReplacementPolicy and JobManagedBy disabled": {
enableJobBackoffLimitPerIndex: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: ptr.To[int32](4),
Completions: ptr.To[int32](4),
BackoffLimit: ptr.To[int32](math.MaxInt32),
CompletionMode: ptr.To(batch.IndexedCompletion),
BackoffLimitPerIndex: ptr.To[int32](1),
MaxFailedIndexes: ptr.To[int32](1),
},
},
pods: []v1.Pod{
*buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
*buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod,
*buildPod().uid("c").index("2").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
*buildPod().uid("d").index("3").phase(v1.PodRunning).indexFailureCount("0").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Failed: 3,
Succeeded: 1,
FailedIndexes: ptr.To("0,2"),
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
Reason: batch.JobReasonMaxFailedIndexesExceeded,
Message: "Job has exceeded the specified maximal number of failed indexes",
},
},
},
},
"job failed due to exceeding max failed indexes; JobManagedBy enabled": {
enableJobBackoffLimitPerIndex: true,
enableJobManagedBy: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: ptr.To[int32](4),
Completions: ptr.To[int32](4),
BackoffLimit: ptr.To[int32](math.MaxInt32),
CompletionMode: ptr.To(batch.IndexedCompletion),
BackoffLimitPerIndex: ptr.To[int32](1),
MaxFailedIndexes: ptr.To[int32](1),
},
},
pods: []v1.Pod{
*buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
*buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod,
*buildPod().uid("c").index("2").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
*buildPod().uid("d").index("3").phase(v1.PodRunning).indexFailureCount("0").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Failed: 3,
Succeeded: 1,
FailedIndexes: ptr.To("0,2"),
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonMaxFailedIndexesExceeded,
Message: "Job has exceeded the specified maximal number of failed indexes",
},
},
},
},
} }
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
fakeClock := clocktesting.NewFakeClock(now) fakeClock := clocktesting.NewFakeClock(now)
manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock)
@ -6431,6 +6968,10 @@ func TestJobBackoffForOnFailure(t *testing.T) {
expectedSucceeded int32 expectedSucceeded int32
expectedFailed int32 expectedFailed int32
expectedConditions []batch.JobCondition expectedConditions []batch.JobCondition
// features
enableJobManagedBy bool
enableJobPodReplacementPolicy bool
}{ }{
"backoffLimit 0 should have 1 pod active": { "backoffLimit 0 should have 1 pod active": {
parallelism: 1, parallelism: 1,
@ -6618,10 +7159,100 @@ func TestJobBackoffForOnFailure(t *testing.T) {
}, },
}, },
}, },
"finished job; JobPodReplacementPolicy enabled": {
enableJobPodReplacementPolicy: true,
parallelism: 2,
completions: 4,
backoffLimit: 6,
suspend: true,
restartCounts: []int32{1, 1, 2, 0},
podPhase: v1.PodSucceeded,
expectedActive: 0,
expectedSucceeded: 4,
expectedFailed: 0,
expectedConditions: []batch.JobCondition{
{
Type: batch.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
},
{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
},
},
},
"finished job; JobManagedBy enabled": {
enableJobManagedBy: true,
parallelism: 2,
completions: 4,
backoffLimit: 6,
suspend: true,
restartCounts: []int32{1, 1, 2, 0},
podPhase: v1.PodSucceeded,
expectedActive: 0,
expectedSucceeded: 4,
expectedFailed: 0,
expectedConditions: []batch.JobCondition{
{
Type: batch.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
},
{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
},
},
},
"too many job failures with podRunning - multiple pods; JobPodReplacementPolicy enabled": {
enableJobPodReplacementPolicy: true,
parallelism: 2,
completions: 5,
backoffLimit: 2,
suspend: false,
restartCounts: []int32{1, 1},
podPhase: v1.PodRunning,
expectedActive: 0,
expectedSucceeded: 0,
expectedFailed: 2,
expectedConditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonBackoffLimitExceeded,
Message: "Job has reached the specified backoff limit",
},
},
},
"too many job failures with podRunning - multiple pods; JobManagedBy enabled": {
enableJobManagedBy: true,
parallelism: 2,
completions: 5,
backoffLimit: 2,
suspend: false,
restartCounts: []int32{1, 1},
podPhase: v1.PodRunning,
expectedActive: 0,
expectedSucceeded: 0,
expectedFailed: 2,
expectedConditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonBackoffLimitExceeded,
Message: "Job has reached the specified backoff limit",
},
},
},
} }
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy)
// job manager setup // job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
@ -6723,6 +7354,12 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
expectedSucceeded: 0, expectedSucceeded: 0,
expectedFailed: 2, expectedFailed: 2,
expectedConditions: []batch.JobCondition{ expectedConditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonBackoffLimitExceeded,
Message: "Job has reached the specified backoff limit",
},
{ {
Type: batch.JobFailed, Type: batch.JobFailed,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,
@ -6754,6 +7391,12 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
expectedSucceeded: 0, expectedSucceeded: 0,
expectedFailed: 7, expectedFailed: 7,
expectedConditions: []batch.JobCondition{ expectedConditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batch.JobReasonBackoffLimitExceeded,
Message: "Job has reached the specified backoff limit",
},
{ {
Type: batch.JobFailed, Type: batch.JobFailed,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,

View File

@ -50,7 +50,7 @@ func matchSuccessPolicy(logger klog.Logger, successPolicy *batch.SuccessPolicy,
} }
func hasSuccessCriteriaMetCondition(job *batch.Job) *batch.JobCondition { func hasSuccessCriteriaMetCondition(job *batch.Job) *batch.JobCondition {
if feature.DefaultFeatureGate.Enabled(features.JobSuccessPolicy) { if feature.DefaultFeatureGate.Enabled(features.JobSuccessPolicy) || delayTerminalCondition() {
successCriteriaMet := findConditionByType(job.Status.Conditions, batch.JobSuccessCriteriaMet) successCriteriaMet := findConditionByType(job.Status.Conditions, batch.JobSuccessCriteriaMet)
if successCriteriaMet != nil && successCriteriaMet.Status == v1.ConditionTrue { if successCriteriaMet != nil && successCriteriaMet.Status == v1.ConditionTrue {
return successCriteriaMet return successCriteriaMet
@ -60,7 +60,7 @@ func hasSuccessCriteriaMetCondition(job *batch.Job) *batch.JobCondition {
} }
func isSuccessCriteriaMetCondition(cond *batch.JobCondition) bool { func isSuccessCriteriaMetCondition(cond *batch.JobCondition) bool {
return feature.DefaultFeatureGate.Enabled(features.JobSuccessPolicy) && return (feature.DefaultFeatureGate.Enabled(features.JobSuccessPolicy) || delayTerminalCondition()) &&
cond != nil && cond.Type == batch.JobSuccessCriteriaMet && cond.Status == v1.ConditionTrue cond != nil && cond.Type == batch.JobSuccessCriteriaMet && cond.Status == v1.ConditionTrue
} }

View File

@ -404,9 +404,17 @@ func getStatusValidationOptions(newJob, oldJob *batch.Job) batchvalidation.JobSt
RejectCompleteJobWithoutCompletionTime: isJobCompleteChanged || isCompletionTimeChanged, RejectCompleteJobWithoutCompletionTime: isJobCompleteChanged || isCompletionTimeChanged,
RejectCompleteJobWithFailedCondition: isJobCompleteChanged || isJobFailedChanged, RejectCompleteJobWithFailedCondition: isJobCompleteChanged || isJobFailedChanged,
RejectCompleteJobWithFailureTargetCondition: isJobCompleteChanged || isJobFailureTargetChanged, RejectCompleteJobWithFailureTargetCondition: isJobCompleteChanged || isJobFailureTargetChanged,
AllowForSuccessCriteriaMetInExtendedScope: true,
} }
} }
return batchvalidation.JobStatusValidationOptions{} if utilfeature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
return batchvalidation.JobStatusValidationOptions{
AllowForSuccessCriteriaMetInExtendedScope: true,
}
}
return batchvalidation.JobStatusValidationOptions{
AllowForSuccessCriteriaMetInExtendedScope: batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuccessCriteriaMet),
}
} }
// WarningsOnUpdate returns warnings for the given update. // WarningsOnUpdate returns warnings for the given update.

View File

@ -2065,6 +2065,7 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) {
cases := map[string]struct { cases := map[string]struct {
enableJobManagedBy bool enableJobManagedBy bool
enableJobSuccessPolicy bool enableJobSuccessPolicy bool
enableJobPodReplacementPolicy bool
job *batch.Job job *batch.Job
newJob *batch.Job newJob *batch.Job
@ -3021,6 +3022,37 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) {
{Type: field.ErrorTypeInvalid, Field: "status.conditions"}, {Type: field.ErrorTypeInvalid, Field: "status.conditions"},
}, },
}, },
"valid update of Job if SuccessCriteriaMet already present for NonIndexed Jobs; JobSuccessPolicy enabled, while JobManagedBy and JobPodReplacementPolicy disabled": {
enableJobSuccessPolicy: true,
enableJobManagedBy: false,
enableJobPodReplacementPolicy: false,
job: &batch.Job{
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{},
Status: batch.JobStatus{
Conditions: []batch.JobCondition{{
Type: batch.JobSuccessCriteriaMet,
Status: api.ConditionTrue,
}},
},
},
newJob: &batch.Job{
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{},
Status: batch.JobStatus{
Conditions: []batch.JobCondition{
{
Type: batch.JobSuccessCriteriaMet,
Status: api.ConditionTrue,
},
{
Type: batch.JobComplete,
Status: api.ConditionTrue,
},
},
},
},
},
"invalid addition of SuccessCriteriaMet for Job with Failed": { "invalid addition of SuccessCriteriaMet for Job with Failed": {
enableJobSuccessPolicy: true, enableJobSuccessPolicy: true,
job: &batch.Job{ job: &batch.Job{
@ -3378,11 +3410,42 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) {
{Type: field.ErrorTypeInvalid, Field: "status.conditions"}, {Type: field.ErrorTypeInvalid, Field: "status.conditions"},
}, },
}, },
"valid addition of SuccessCriteriaMet when JobManagedBy is enabled": {
enableJobManagedBy: true,
job: &batch.Job{
ObjectMeta: validObjectMeta,
},
newJob: &batch.Job{
ObjectMeta: validObjectMeta,
Status: batch.JobStatus{
Conditions: []batch.JobCondition{{
Type: batch.JobSuccessCriteriaMet,
Status: api.ConditionTrue,
}},
},
},
},
"valid addition of SuccessCriteriaMet when JobPodReplacementPolicy is enabled": {
enableJobPodReplacementPolicy: true,
job: &batch.Job{
ObjectMeta: validObjectMeta,
},
newJob: &batch.Job{
ObjectMeta: validObjectMeta,
Status: batch.JobStatus{
Conditions: []batch.JobCondition{{
Type: batch.JobSuccessCriteriaMet,
Status: api.ConditionTrue,
}},
},
},
},
} }
for name, tc := range cases { for name, tc := range cases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy)
errs := StatusStrategy.ValidateUpdate(ctx, tc.newJob, tc.job) errs := StatusStrategy.ValidateUpdate(ctx, tc.newJob, tc.job)
if diff := cmp.Diff(tc.wantErrs, errs, ignoreErrValueDetail); diff != "" { if diff := cmp.Diff(tc.wantErrs, errs, ignoreErrValueDetail); diff != "" {

View File

@ -21,7 +21,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"strconv" "strconv"
"time"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -83,6 +82,10 @@ var _ = SIGDescribe("Job", func() {
job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to have the interim success condition")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, "")
framework.ExpectNoError(err, "failed to ensure job has the interim success condition: %s", f.Namespace.Name)
ginkgo.By("Ensuring job reaches completions") ginkgo.By("Ensuring job reaches completions")
err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
@ -130,6 +133,10 @@ var _ = SIGDescribe("Job", func() {
job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to have the interim failure condition")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, batchv1.JobReasonPodFailurePolicy)
framework.ExpectNoError(err, "failed to ensure job has the interim failure condition: %s", f.Namespace.Name)
ginkgo.By("Ensuring job fails") ginkgo.By("Ensuring job fails")
err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name) err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to ensure job failure in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job failure in namespace: %s", f.Namespace.Name)
@ -168,6 +175,10 @@ var _ = SIGDescribe("Job", func() {
job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to have the interim success condition")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, "")
framework.ExpectNoError(err, "failed to ensure job has the interim success condition: %s", f.Namespace.Name)
ginkgo.By("Ensuring job reaches completions") ginkgo.By("Ensuring job reaches completions")
err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
@ -248,6 +259,10 @@ var _ = SIGDescribe("Job", func() {
err = e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete) err = e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)
framework.ExpectNoError(err, "failed to await for the pod to be deleted: %s/%s", pod.Name, pod.Namespace) framework.ExpectNoError(err, "failed to await for the pod to be deleted: %s/%s", pod.Name, pod.Namespace)
ginkgo.By("Awaiting for the job to have the interim success condition")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, "")
framework.ExpectNoError(err, "failed to ensure job has the interim success condition: %s", f.Namespace.Name)
ginkgo.By("Ensuring job reaches completions") ginkgo.By("Ensuring job reaches completions")
err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
@ -337,6 +352,10 @@ var _ = SIGDescribe("Job", func() {
err = e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete) err = e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)
framework.ExpectNoError(err, "failed to await for the pod to be deleted: %s/%s", pod.Name, pod.Namespace) framework.ExpectNoError(err, "failed to await for the pod to be deleted: %s/%s", pod.Name, pod.Namespace)
ginkgo.By("Awaiting for the job to have the interim success condition")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, "")
framework.ExpectNoError(err, "failed to ensure job has the interim success condition: %s", f.Namespace.Name)
ginkgo.By("Ensuring job reaches completions") ginkgo.By("Ensuring job reaches completions")
err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
@ -369,6 +388,10 @@ var _ = SIGDescribe("Job", func() {
job, err = e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job) job, err = e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to have the interim success condition")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, "")
framework.ExpectNoError(err, "failed to ensure job has the interim success condition: %s", f.Namespace.Name)
ginkgo.By("Waiting for job to complete") ginkgo.By("Waiting for job to complete")
err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
@ -537,6 +560,10 @@ done`}
job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to have the interim failure condition")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, batchv1.JobReasonFailedIndexes)
framework.ExpectNoError(err, "failed to ensure job has the interim failure condition: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to fail as there are failed indexes") ginkgo.By("Awaiting for the job to fail as there are failed indexes")
err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name) err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
@ -573,6 +600,10 @@ done`}
job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to have the interim failure condition")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, batchv1.JobReasonMaxFailedIndexesExceeded)
framework.ExpectNoError(err, "failed to ensure job has the interim failure condition: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to fail as the number of max failed indexes is exceeded") ginkgo.By("Awaiting for the job to fail as the number of max failed indexes is exceeded")
err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name) err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
@ -616,6 +647,10 @@ done`}
job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to have the interim failure condition")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, batchv1.JobReasonFailedIndexes)
framework.ExpectNoError(err, "failed to ensure job has the interim failure condition: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to fail as all indexes are failed") ginkgo.By("Awaiting for the job to fail as all indexes are failed")
err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name) err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
@ -705,6 +740,10 @@ done`}
job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to have the interim success condition")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, "")
framework.ExpectNoError(err, "failed to ensure job has the interim success condition: %s", f.Namespace.Name)
ginkgo.By("Ensuring job reaches completions") ginkgo.By("Ensuring job reaches completions")
err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, *job.Spec.Completions) err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, *job.Spec.Completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
@ -720,8 +759,13 @@ done`}
job := e2ejob.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds, backoffLimit) job := e2ejob.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds, backoffLimit)
job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to have the interim failure condition")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, batchv1.JobReasonDeadlineExceeded)
framework.ExpectNoError(err, "failed to ensure job has the interim failure condition: %s", f.Namespace.Name)
ginkgo.By("Ensuring job past active deadline") ginkgo.By("Ensuring job past active deadline")
err = waitForJobFailure(ctx, f.ClientSet, f.Namespace.Name, job.Name, time.Duration(activeDeadlineSeconds+15)*time.Second, "DeadlineExceeded") err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailed, batchv1.JobReasonDeadlineExceeded)
framework.ExpectNoError(err, "failed to ensure job past active deadline in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job past active deadline in namespace: %s", f.Namespace.Name)
}) })
@ -823,9 +867,13 @@ done`}
job := e2ejob.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, int32(backoff)) job := e2ejob.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, int32(backoff))
job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Ensuring job exceed backofflimit")
err = waitForJobFailure(ctx, f.ClientSet, f.Namespace.Name, job.Name, e2ejob.JobTimeout, "BackoffLimitExceeded") ginkgo.By("Awaiting for the job to have the interim failure condition")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, batchv1.JobReasonBackoffLimitExceeded)
framework.ExpectNoError(err, "failed to ensure job has the interim failure condition: %s", f.Namespace.Name)
ginkgo.By("Ensuring job exceed backofflimit")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailed, batchv1.JobReasonBackoffLimitExceeded)
framework.ExpectNoError(err, "failed to ensure job exceed backofflimit in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job exceed backofflimit in namespace: %s", f.Namespace.Name)
ginkgo.By(fmt.Sprintf("Checking that %d pod created and status is failed", backoff+1)) ginkgo.By(fmt.Sprintf("Checking that %d pod created and status is failed", backoff+1))
@ -870,6 +918,10 @@ done`}
job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to have the interim success condition")
err = waitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, "")
framework.ExpectNoError(err, "failed to ensure job has the interim success condition: %s", f.Namespace.Name)
ginkgo.By("Ensuring job reaches completions") ginkgo.By("Ensuring job reaches completions")
err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions) err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
@ -1186,22 +1238,26 @@ func waitForJobEvent(ctx context.Context, config watchEventConfig) {
} }
} }
// waitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail. // waitForJobCondition waits for the specified Job to have the expected condition with the specific reason.
func waitForJobFailure(ctx context.Context, c clientset.Interface, ns, jobName string, timeout time.Duration, reason string) error { func waitForJobCondition(ctx context.Context, c clientset.Interface, ns, jobName string, cType batchv1.JobConditionType, reason string) error {
return wait.Poll(framework.Poll, timeout, func() (bool, error) { err := wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, false, func(ctx context.Context) (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{}) curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{})
if err != nil { if err != nil {
return false, err return false, err
} }
for _, c := range curr.Status.Conditions { for _, c := range curr.Status.Conditions {
if c.Type == batchv1.JobFailed && c.Status == v1.ConditionTrue { if c.Type == cType && c.Status == v1.ConditionTrue {
if reason == "" || reason == c.Reason { if reason == c.Reason {
return true, nil return true, nil
} }
} }
} }
return false, nil return false, nil
}) })
if err != nil {
return fmt.Errorf("waiting for Job %q to have the condition %q with reason: %q: %w", jobName, cType, reason, err)
}
return nil
} }
func findConditionByType(list []batchv1.JobCondition, cType batchv1.JobConditionType) *batchv1.JobCondition { func findConditionByType(list []batchv1.JobCondition, cType batchv1.JobConditionType) *batchv1.JobCondition {

View File

@ -29,6 +29,7 @@ import (
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1" eventsv1 "k8s.io/api/events/v1"
@ -1160,6 +1161,301 @@ func TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff(t *testing.T)
} }
} }
// TestDelayTerminalPhaseCondition tests the fix for Job controller to delay
// setting the terminal phase conditions (Failed and Complete) until all Pods
// are terminal. The fate of the Job is indicated by the interim Job conditions:
// FailureTarget, or SuccessCriteriaMet.
func TestDelayTerminalPhaseCondition(t *testing.T) {
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
podTemplateSpec := v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "main-container",
Image: "foo",
ImagePullPolicy: v1.PullIfNotPresent,
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
},
},
},
}
failOnePod := func(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job) {
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodFailed, err)
}
}
succeedOnePodAndScaleDown := func(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job) {
// mark one pod as succeeded
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 0); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err)
}
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
if _, err := updateJob(ctx, jobClient, jobObj.Name, func(j *batchv1.Job) {
j.Spec.Parallelism = ptr.To[int32](1)
j.Spec.Completions = ptr.To[int32](1)
}); err != nil {
t.Fatalf("Unexpected error when scaling down the job: %v", err)
}
}
testCases := map[string]struct {
enableJobManagedBy bool
enableJobPodReplacementPolicy bool
job batchv1.Job
action func(context.Context, clientset.Interface, *batchv1.Job)
wantInterimStatus *batchv1.JobStatus
wantTerminalStatus batchv1.JobStatus
}{
"job backoff limit exceeded; JobPodReplacementPolicy and JobManagedBy disabled": {
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
Template: podTemplateSpec,
BackoffLimit: ptr.To[int32](0),
},
},
action: failOnePod,
wantTerminalStatus: batchv1.JobStatus{
Failed: 2,
Ready: ptr.To[int32](0),
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobFailed,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
},
},
},
"job backoff limit exceeded; JobPodReplacementPolicy enabled": {
enableJobPodReplacementPolicy: true,
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
Template: podTemplateSpec,
BackoffLimit: ptr.To[int32](0),
},
},
action: failOnePod,
wantInterimStatus: &batchv1.JobStatus{
Failed: 2,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](1),
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
},
},
wantTerminalStatus: batchv1.JobStatus{
Failed: 2,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
{
Type: batchv1.JobFailed,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
},
},
},
"job backoff limit exceeded; JobManagedBy enabled": {
enableJobManagedBy: true,
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
Template: podTemplateSpec,
BackoffLimit: ptr.To[int32](0),
},
},
action: failOnePod,
wantInterimStatus: &batchv1.JobStatus{
Failed: 2,
Ready: ptr.To[int32](0),
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
},
},
wantTerminalStatus: batchv1.JobStatus{
Failed: 2,
Ready: ptr.To[int32](0),
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
{
Type: batchv1.JobFailed,
Status: v1.ConditionTrue,
Reason: batchv1.JobReasonBackoffLimitExceeded,
},
},
},
},
"job scale down to meet completions; JobPodReplacementPolicy and JobManagedBy disabled": {
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
CompletionMode: ptr.To(batchv1.IndexedCompletion),
Template: podTemplateSpec,
},
},
action: succeedOnePodAndScaleDown,
wantTerminalStatus: batchv1.JobStatus{
Succeeded: 1,
Ready: ptr.To[int32](0),
CompletedIndexes: "0",
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobComplete,
Status: v1.ConditionTrue,
},
},
},
},
"job scale down to meet completions; JobPodReplacementPolicy enabled": {
enableJobPodReplacementPolicy: true,
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
CompletionMode: ptr.To(batchv1.IndexedCompletion),
Template: podTemplateSpec,
},
},
action: succeedOnePodAndScaleDown,
wantInterimStatus: &batchv1.JobStatus{
Succeeded: 1,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](1),
CompletedIndexes: "0",
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
},
},
},
wantTerminalStatus: batchv1.JobStatus{
Succeeded: 1,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
CompletedIndexes: "0",
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
},
{
Type: batchv1.JobComplete,
Status: v1.ConditionTrue,
},
},
},
},
"job scale down to meet completions; JobManagedBy enabled": {
enableJobManagedBy: true,
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
CompletionMode: ptr.To(batchv1.IndexedCompletion),
Template: podTemplateSpec,
},
},
action: succeedOnePodAndScaleDown,
wantInterimStatus: &batchv1.JobStatus{
Succeeded: 1,
Ready: ptr.To[int32](0),
CompletedIndexes: "0",
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
},
},
},
wantTerminalStatus: batchv1.JobStatus{
Succeeded: 1,
Ready: ptr.To[int32](0),
CompletedIndexes: "0",
Conditions: []batchv1.JobCondition{
{
Type: batchv1.JobSuccessCriteriaMet,
Status: v1.ConditionTrue,
},
{
Type: batchv1.JobComplete,
Status: v1.ConditionTrue,
},
},
},
},
}
for name, test := range testCases {
t.Run(name, func(t *testing.T) {
resetMetrics()
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, test.enableJobPodReplacementPolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, test.enableJobManagedBy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, true)
closeFn, restConfig, clientSet, ns := setup(t, "delay-terminal-condition")
t.Cleanup(closeFn)
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
t.Cleanup(cancel)
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
if err != nil {
t.Fatalf("Error %q while creating the job %q", err, jobObj.Name)
}
t.Cleanup(func() { removePodsFinalizer(ctx, t, clientSet, ns.Name) })
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
waitForPodsToBeActive(ctx, t, jobClient, *jobObj.Spec.Parallelism, jobObj)
test.action(ctx, clientSet, jobObj)
if test.wantInterimStatus != nil {
validateJobStatus(ctx, t, clientSet, jobObj, *test.wantInterimStatus)
// Set terminal phase to all the remaining pods to simulate
// Kubelet (or other components like PodGC).
jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(s v1.PodStatus) bool {
return (s.Phase == v1.PodPending || s.Phase == v1.PodRunning)
})
if err != nil {
t.Fatalf("Failed to list Job Pods: %v", err)
}
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, len(jobPods)); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err)
}
}
validateJobStatus(ctx, t, clientSet, jobObj, test.wantTerminalStatus)
})
}
}
// TestBackoffLimitPerIndex tests handling of job and its pods when // TestBackoffLimitPerIndex tests handling of job and its pods when
// backoff limit per index is used. // backoff limit per index is used.
func TestBackoffLimitPerIndex(t *testing.T) { func TestBackoffLimitPerIndex(t *testing.T) {
@ -2821,7 +3117,7 @@ func TestElasticIndexedJob(t *testing.T) {
jobUpdates: []jobUpdate{ jobUpdates: []jobUpdate{
{ {
completions: ptr.To[int32](0), completions: ptr.To[int32](0),
wantTerminating: ptr.To[int32](3), wantTerminating: ptr.To[int32](0),
}, },
}, },
}, },
@ -3595,6 +3891,25 @@ func validateJobsPodsStatusOnlyWithTimeout(ctx context.Context, t testing.TB, cl
} }
} }
func validateJobStatus(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, wantStatus batchv1.JobStatus) {
t.Helper()
diff := ""
if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
gotJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get updated Job: %v, last status diff (-want,+got):\n%s", err, diff)
}
diff = cmp.Diff(wantStatus, gotJob.Status,
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(batchv1.JobStatus{}, "StartTime", "UncountedTerminatedPods", "CompletionTime"),
cmpopts.IgnoreFields(batchv1.JobCondition{}, "LastProbeTime", "LastTransitionTime", "Message"),
)
return diff == "", nil
}); err != nil {
t.Fatalf("Waiting for Job Status: %v\n, Status diff (-want,+got):\n%s", err, diff)
}
}
func validateJobPodsStatus(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) { func validateJobPodsStatus(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
t.Helper() t.Helper()
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired) validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired)