diff --git a/pkg/apis/batch/validation/validation.go b/pkg/apis/batch/validation/validation.go index 21ecefdf96d..d731732a1ba 100644 --- a/pkg/apis/batch/validation/validation.go +++ b/pkg/apis/batch/validation/validation.go @@ -518,6 +518,16 @@ func validateJobStatus(job *batch.Job, fldPath *field.Path, opts JobStatusValida allErrs = append(allErrs, field.Invalid(fldPath.Child("completionTime"), status.CompletionTime, "completionTime cannot be set before startTime")) } } + if opts.RejectFailedJobWithoutFailureTarget { + if IsJobFailed(job) && !isJobFailureTarget(job) { + allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set Failed=True condition without the FailureTarget=true condition")) + } + } + if opts.RejectCompleteJobWithoutSuccessCriteriaMet { + if IsJobComplete(job) && !isJobSuccessCriteriaMet(job) { + allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set Complete=True condition without the SuccessCriteriaMet=true condition")) + } + } isJobFinished := IsJobFinished(job) if opts.RejectFinishedJobWithActivePods { if status.Active > 0 && isJobFinished { @@ -568,7 +578,17 @@ func validateJobStatus(job *batch.Job, fldPath *field.Path, opts JobStatusValida } } } - if ptr.Deref(job.Spec.CompletionMode, batch.NonIndexedCompletion) != batch.IndexedCompletion && isJobSuccessCriteriaMet(job) { + if opts.RejectFinishedJobWithTerminatingPods { + if status.Terminating != nil && *status.Terminating > 0 && isJobFinished { + allErrs = append(allErrs, field.Invalid(fldPath.Child("terminating"), status.Terminating, "terminating>0 is invalid for finished job")) + } + } + if opts.RejectMoreReadyThanActivePods { + if status.Ready != nil && *status.Ready > status.Active { + allErrs = append(allErrs, field.Invalid(fldPath.Child("ready"), *status.Ready, "cannot set more ready pods than active")) + } + } + if !opts.AllowForSuccessCriteriaMetInExtendedScope && ptr.Deref(job.Spec.CompletionMode, batch.NonIndexedCompletion) != batch.IndexedCompletion && isJobSuccessCriteriaMet(job) { allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet to NonIndexed Job")) } if isJobSuccessCriteriaMet(job) && IsJobFailed(job) { @@ -577,7 +597,7 @@ func validateJobStatus(job *batch.Job, fldPath *field.Path, opts JobStatusValida if isJobSuccessCriteriaMet(job) && isJobFailureTarget(job) { allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet=True and FailureTarget=true conditions")) } - if job.Spec.SuccessPolicy == nil && isJobSuccessCriteriaMet(job) { + if !opts.AllowForSuccessCriteriaMetInExtendedScope && job.Spec.SuccessPolicy == nil && isJobSuccessCriteriaMet(job) { allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set SuccessCriteriaMet=True for Job without SuccessPolicy")) } if job.Spec.SuccessPolicy != nil && !isJobSuccessCriteriaMet(job) && IsJobComplete(job) { @@ -1005,6 +1025,8 @@ type JobStatusValidationOptions struct { RejectFailedIndexesOverlappingCompleted bool RejectCompletedIndexesForNonIndexedJob bool RejectFailedIndexesForNoBackoffLimitPerIndex bool + RejectFailedJobWithoutFailureTarget bool + RejectCompleteJobWithoutSuccessCriteriaMet bool RejectFinishedJobWithActivePods bool RejectFinishedJobWithoutStartTime bool RejectFinishedJobWithUncountedTerminatedPods bool @@ -1015,4 +1037,7 @@ type JobStatusValidationOptions struct { RejectNotCompleteJobWithCompletionTime bool RejectCompleteJobWithFailedCondition bool RejectCompleteJobWithFailureTargetCondition bool + AllowForSuccessCriteriaMetInExtendedScope bool + RejectMoreReadyThanActivePods bool + RejectFinishedJobWithTerminatingPods bool } diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index dc694bd4772..bb726484020 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -844,7 +844,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { // Given that the Job already has the SuccessCriteriaMet condition, the termination condition already had confirmed in another cycle. // So, the job-controller evaluates the podFailurePolicy only when the Job doesn't have the SuccessCriteriaMet condition. - if jobCtx.finishedCondition == nil && feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { + if jobCtx.finishedCondition == nil && (feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) || delayTerminalCondition()) { failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget) if failureTargetCondition != nil && failureTargetCondition.Status == v1.ConditionTrue { jobCtx.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now()) @@ -857,9 +857,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) { // check if the number of pod restart exceeds backoff (for restart OnFailure only) // OR if the number of failed jobs increased since the last syncJob - jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonBackoffLimitExceeded, "Job has reached the specified backoff limit", jm.clock.Now()) + jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonBackoffLimitExceeded, "Job has reached the specified backoff limit") } else if jm.pastActiveDeadline(&job) { - jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonDeadlineExceeded, "Job was active longer than specified deadline", jm.clock.Now()) + jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonDeadlineExceeded, "Job was active longer than specified deadline") } else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) { syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time) logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration) @@ -874,9 +874,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { jobCtx.failedIndexes = calculateFailedIndexes(logger, &job, pods) if jobCtx.finishedCondition == nil { if job.Spec.MaxFailedIndexes != nil && jobCtx.failedIndexes.total() > int(*job.Spec.MaxFailedIndexes) { - jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonMaxFailedIndexesExceeded, "Job has exceeded the specified maximal number of failed indexes", jm.clock.Now()) + jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonMaxFailedIndexesExceeded, "Job has exceeded the specified maximal number of failed indexes") } else if jobCtx.failedIndexes.total() > 0 && jobCtx.failedIndexes.total()+jobCtx.succeededIndexes.total() >= int(*job.Spec.Completions) { - jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonFailedIndexes, "Job has failed indexes", jm.clock.Now()) + jobCtx.finishedCondition = jm.newFailureCondition(batch.JobReasonFailedIndexes, "Job has failed indexes") } } jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx) @@ -925,7 +925,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { complete = jobCtx.succeeded >= *job.Spec.Completions && active == 0 } if complete { - jobCtx.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now()) + jobCtx.finishedCondition = jm.newSuccessCondition() } else if manageJobCalled { // Update the conditions / emit events only if manageJob was called in // this syncJob. Otherwise wait for the right syncJob call to make @@ -975,6 +975,27 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { return manageJobErr } +func (jm *Controller) newFailureCondition(reason, message string) *batch.JobCondition { + cType := batch.JobFailed + if delayTerminalCondition() { + cType = batch.JobFailureTarget + } + return newCondition(cType, v1.ConditionTrue, reason, message, jm.clock.Now()) +} + +func (jm *Controller) newSuccessCondition() *batch.JobCondition { + cType := batch.JobComplete + if delayTerminalCondition() { + cType = batch.JobSuccessCriteriaMet + } + return newCondition(cType, v1.ConditionTrue, "", "", jm.clock.Now()) +} + +func delayTerminalCondition() bool { + return feature.DefaultFeatureGate.Enabled(features.JobManagedBy) || + feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) +} + // deleteActivePods issues deletion for active Pods, preserving finalizers. // This is done through DELETE calls that set deletion timestamps. // The method trackJobStatusAndRemoveFinalizers removes the finalizers, after @@ -1165,7 +1186,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job needsFlush = true } } - if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { + if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) || delayTerminalCondition() { if jobCtx.finishedCondition != nil && jobCtx.finishedCondition.Type == batch.JobFailureTarget { // Append the interim FailureTarget condition to update the job status with before finalizers are removed. @@ -1378,6 +1399,12 @@ func (jm *Controller) enactJobFinished(logger klog.Logger, jobCtx *syncJobCtx) b return false } } + if delayTerminalCondition() { + if *jobCtx.terminating > 0 { + logger.V(4).Info("Delaying marking the Job as finished, because there are still terminating pod(s)", "job", klog.KObj(job), "condition", jobCtx.finishedCondition.Type, "count", *jobCtx.terminating) + return false + } + } finishedCond := jobCtx.finishedCondition job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message, jm.clock.Now()) if finishedCond.Type == batch.JobComplete { @@ -1964,12 +1991,17 @@ func countReadyPods(pods []*v1.Pod) int32 { // trackTerminatingPods checks if the count of terminating pods is tracked. // They are tracked when any the following is true: -// - JobPodReplacementPolicy is enabled to be returned in the status field, -// - only failed pods are replaced, because pod failure policy is used +// - JobPodReplacementPolicy is enabled to be returned in the status field; +// and to delay setting the Job terminal condition, +// - JobManagedBy is enabled to delay setting Job terminal condition, +// - only failed pods are replaced, because pod failure policy is used func trackTerminatingPods(job *batch.Job) bool { if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { return true } + if feature.DefaultFeatureGate.Enabled(features.JobManagedBy) { + return true + } return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 172e5b700a5..3a008de44f7 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -292,6 +292,7 @@ func TestControllerSyncJob(t *testing.T) { jobPodReplacementPolicy bool jobPodFailurePolicy bool jobSuccessPolicy bool + jobManagedBy bool }{ "job start": { parallelism: 2, @@ -827,7 +828,7 @@ func TestControllerSyncJob(t *testing.T) { expectedCompletedIdxs: "0", expectedConditions: []batch.JobCondition{ { - Type: batch.JobFailed, + Type: batch.JobFailureTarget, Status: v1.ConditionTrue, Reason: batch.JobReasonBackoffLimitExceeded, Message: "Job has reached the specified backoff limit", @@ -1220,6 +1221,32 @@ func TestControllerSyncJob(t *testing.T) { }, }, }, + "backoff limit exceeded; JobManagedBy enabled": { + jobManagedBy: true, + parallelism: 2, + completions: 3, + backoffLimit: 0, + completionMode: batch.IndexedCompletion, + podsWithIndexes: []indexPhase{ + {"0", v1.PodSucceeded}, + {"1", v1.PodFailed}, + {"2", v1.PodRunning}, + }, + expectedSucceeded: 1, + expectedFailed: 2, + expectedCompletedIdxs: "0", + expectedConditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonBackoffLimitExceeded, + Message: "Job has reached the specified backoff limit", + }, + }, + expectedPodPatches: 3, + expectedReady: ptr.To[int32](0), + expectedDeletions: 1, + }, } for name, tc := range testCases { @@ -1229,6 +1256,7 @@ func TestControllerSyncJob(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.jobPodReplacementPolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.jobPodFailurePolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.jobSuccessPolicy) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.jobManagedBy) // job manager setup clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) @@ -1546,6 +1574,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { completedCond := newCondition(batch.JobComplete, v1.ConditionTrue, "", "", now) succeededCond := newCondition(batch.JobSuccessCriteriaMet, v1.ConditionTrue, "", "", minuteAgo) failedCond := newCondition(batch.JobFailed, v1.ConditionTrue, "", "", now) + failureTargetCond := newCondition(batch.JobFailureTarget, v1.ConditionTrue, "", "", now) indexedCompletion := batch.IndexedCompletion mockErr := errors.New("mock error") cases := map[string]struct { @@ -1565,6 +1594,8 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { // features enableJobBackoffLimitPerIndex bool enableJobSuccessPolicy bool + enableJobPodReplacementPolicy bool + enableJobManagedBy bool }{ "no updates": {}, "new active": { @@ -2114,6 +2145,215 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, wantFailedPodsMetric: 2, }, + "pod is terminal; JobFailed condition is set": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: ptr.To[int32](1), + Parallelism: ptr.To[int32](1), + }, + Status: batch.JobStatus{ + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Failed: []types.UID{"a"}, + }, + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().Pod, + }, + finishedCond: failedCond, + wantRmFinalizers: 1, + wantStatusUpdates: []batch.JobStatus{ + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Failed: 1, + Conditions: []batch.JobCondition{*failedCond}, + }, + }, + wantFailedPodsMetric: 1, + }, + "pod is terminating; counted as failed, but the JobFailed condition is delayed; JobPodReplacementPolicy enabled": { + enableJobPodReplacementPolicy: true, + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: ptr.To[int32](1), + Parallelism: ptr.To[int32](1), + }, + Status: batch.JobStatus{ + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Failed: []types.UID{"a"}, + }, + Conditions: []batch.JobCondition{*failureTargetCond}, + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodRunning).deletionTimestamp().Pod, + }, + finishedCond: failedCond, + wantStatusUpdates: []batch.JobStatus{ + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Failed: 1, + Conditions: []batch.JobCondition{*failureTargetCond}, + }, + }, + wantFailedPodsMetric: 1, + }, + "pod is terminating; counted as failed, but the JobFailed condition is delayed; JobManagedBy enabled": { + enableJobManagedBy: true, + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: ptr.To[int32](1), + Parallelism: ptr.To[int32](1), + }, + Status: batch.JobStatus{ + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Failed: []types.UID{"a"}, + }, + Conditions: []batch.JobCondition{*failureTargetCond}, + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodRunning).deletionTimestamp().Pod, + }, + finishedCond: failedCond, + wantStatusUpdates: []batch.JobStatus{ + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Failed: 1, + Conditions: []batch.JobCondition{*failureTargetCond}, + }, + }, + wantFailedPodsMetric: 1, + }, + "pod is terminating; counted as failed, JobFailed condition is not delayed; JobPodReplacementPolicy and JobManagedBy disabled": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: ptr.To[int32](1), + Parallelism: ptr.To[int32](1), + }, + Status: batch.JobStatus{ + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Failed: []types.UID{"a"}, + }, + Conditions: []batch.JobCondition{*failureTargetCond}, + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodRunning).deletionTimestamp().Pod, + }, + finishedCond: failedCond, + wantStatusUpdates: []batch.JobStatus{ + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Failed: 1, + Conditions: []batch.JobCondition{*failureTargetCond}, + }, + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Failed: 1, + Conditions: []batch.JobCondition{*failureTargetCond, *failedCond}, + }, + }, + wantFailedPodsMetric: 1, + }, + "pod is terminating; JobSuccessCriteriaMet, but JobComplete condition is delayed; JobPodReplacementPolicy enabled": { + enableJobPodReplacementPolicy: true, + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: ptr.To[int32](1), + Parallelism: ptr.To[int32](1), + }, + Status: batch.JobStatus{ + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Failed: []types.UID{"a"}, + Succeeded: []types.UID{"b"}, + }, + Conditions: []batch.JobCondition{*succeededCond}, + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodRunning).deletionTimestamp().Pod, + }, + finishedCond: completedCond, + wantStatusUpdates: []batch.JobStatus{ + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Succeeded: 1, + Failed: 1, + Conditions: []batch.JobCondition{*succeededCond}, + }, + }, + wantFailedPodsMetric: 1, + wantSucceededPodsMetric: 1, + }, + "pod is terminating; JobSuccessCriteriaMet, but JobComplete condition is delayed; JobManagedBy enabled": { + enableJobManagedBy: true, + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: ptr.To[int32](1), + Parallelism: ptr.To[int32](1), + }, + Status: batch.JobStatus{ + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Failed: []types.UID{"a"}, + Succeeded: []types.UID{"b"}, + }, + Conditions: []batch.JobCondition{*succeededCond}, + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodRunning).deletionTimestamp().Pod, + }, + finishedCond: completedCond, + wantStatusUpdates: []batch.JobStatus{ + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Succeeded: 1, + Failed: 1, + Conditions: []batch.JobCondition{*succeededCond}, + }, + }, + wantFailedPodsMetric: 1, + wantSucceededPodsMetric: 1, + }, + "pod is terminating; JobSuccessCriteriaMet, JobComplete condition is not delayed; JobPodReplacementPolicy and JobManagedBy disabled": { + enableJobSuccessPolicy: true, + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: ptr.To[int32](1), + Parallelism: ptr.To[int32](1), + }, + Status: batch.JobStatus{ + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Failed: []types.UID{"a"}, + Succeeded: []types.UID{"b"}, + }, + Conditions: []batch.JobCondition{*succeededCond}, + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodRunning).deletionTimestamp().Pod, + }, + finishedCond: completedCond, + wantStatusUpdates: []batch.JobStatus{ + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Succeeded: 1, + Failed: 1, + Conditions: []batch.JobCondition{*succeededCond}, + }, + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Succeeded: 1, + Failed: 1, + Conditions: []batch.JobCondition{*succeededCond, *completedCond}, + CompletionTime: ptr.To(metav1.NewTime(now)), + }, + }, + wantFailedPodsMetric: 1, + wantSucceededPodsMetric: 1, + }, + "indexed job with a failed pod with delayed finalizer removal; the pod is not counted": { enableJobBackoffLimitPerIndex: true, job: batch.Job{ @@ -2200,6 +2440,8 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { t.Run(name, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, _ := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock) @@ -2222,14 +2464,15 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { expectedRmFinalizers: tc.expectedRmFinalizers, finishedCondition: tc.finishedCond, } + jobCtx.activePods = controller.FilterActivePods(logger, tc.pods) if isIndexedJob(job) { jobCtx.succeededIndexes = parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) if tc.enableJobBackoffLimitPerIndex && job.Spec.BackoffLimitPerIndex != nil { jobCtx.failedIndexes = calculateFailedIndexes(logger, job, tc.pods) - jobCtx.activePods = controller.FilterActivePods(logger, tc.pods) jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx) } } + jobCtx.terminating = ptr.To(controller.CountTerminatingPods(tc.pods)) err := manager.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, tc.needsFlush) if !errors.Is(err, tc.wantErr) { @@ -2287,6 +2530,10 @@ func TestSyncJobPastDeadline(t *testing.T) { expectedSucceeded int32 expectedFailed int32 expectedConditions []batch.JobCondition + + // features + enableJobPodReplacementPolicy bool + enableJobManagedBy bool }{ "activeDeadlineSeconds less than single pod execution": { parallelism: 1, @@ -2373,10 +2620,79 @@ func TestSyncJobPastDeadline(t *testing.T) { }, }, }, + "activeDeadlineSeconds exceeded; JobPodReplacementPolicy enabled": { + enableJobPodReplacementPolicy: true, + parallelism: 1, + completions: 2, + activeDeadlineSeconds: 10, + startTime: 15, + backoffLimit: 6, + activePods: 1, + succeededPods: 1, + expectedDeletions: 1, + expectedSucceeded: 1, + expectedFailed: 1, + expectedConditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonDeadlineExceeded, + Message: "Job was active longer than specified deadline", + }, + }, + }, + "activeDeadlineSeconds exceeded; JobManagedBy enabled": { + enableJobManagedBy: true, + + parallelism: 1, + completions: 2, + activeDeadlineSeconds: 10, + startTime: 15, + backoffLimit: 6, + activePods: 1, + succeededPods: 1, + expectedDeletions: 1, + expectedSucceeded: 1, + expectedFailed: 1, + expectedConditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonDeadlineExceeded, + Message: "Job was active longer than specified deadline", + }, + }, + }, + "activeDeadlineSeconds exceeded and backofflimit reached; JobManagedBy enabled": { + enableJobManagedBy: true, + + parallelism: 1, + completions: 1, + activeDeadlineSeconds: 1, + startTime: 10, + failedPods: 1, + expectedFailed: 1, + expectedConditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonBackoffLimitExceeded, + Message: "Job has reached the specified backoff limit", + }, + { + Type: batch.JobFailed, + Status: v1.ConditionTrue, + Reason: batch.JobReasonBackoffLimitExceeded, + Message: "Job has reached the specified backoff limit", + }, + }, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy) // job manager setup clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientSet, controller.NoResyncPeriodFunc) @@ -3894,6 +4210,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { enableBackoffLimitPerIndex bool enableJobSuccessPolicy bool enableJobPodReplacementPolicy bool + enableJobManagedBy bool job batch.Job pods []v1.Pod wantStatus batch.JobStatus @@ -3938,12 +4255,6 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, - { - Type: batch.JobComplete, - Status: v1.ConditionTrue, - Reason: batch.JobReasonSuccessPolicy, - Message: "Matched rules at index 0", - }, }, }, }, @@ -4044,12 +4355,6 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, - { - Type: batch.JobComplete, - Status: v1.ConditionTrue, - Reason: batch.JobReasonSuccessPolicy, - Message: "Matched rules at index 0", - }, }, }, }, @@ -4152,12 +4457,6 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, - { - Type: batch.JobComplete, - Status: v1.ConditionTrue, - Reason: batch.JobReasonSuccessPolicy, - Message: "Matched rules at index 0", - }, }, }, }, @@ -4219,12 +4518,8 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { }, }, }, - // In the current mechanism, the job controller adds Complete condition to Job - // even if some running pods still remain. - // So, we need to revisit here before we graduate the JobSuccessPolicy to beta. - // TODO(#123775): A Job might finish with ready!=0 // REF: https://github.com/kubernetes/kubernetes/issues/123775 - "job with successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet and Complete condition when job meets to successPolicy and some pods still are running": { + "job with successPolicy; jobPodReplacementPolicy feature enabled; job has SuccessCriteriaMet condition when job meets to successPolicy and some pods still are running": { enableJobSuccessPolicy: true, enableJobPodReplacementPolicy: true, job: batch.Job{ @@ -4275,8 +4570,58 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", }, + }, + }, + }, + // REF: https://github.com/kubernetes/kubernetes/issues/123775 + "job with successPolicy; JobManagedBy feature enabled; job has SuccessCriteriaMet condition when job meets to successPolicy and some pods still are running": { + enableJobSuccessPolicy: true, + enableJobPodReplacementPolicy: false, + enableJobManagedBy: true, + job: batch.Job{ + TypeMeta: validTypeMeta, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + CompletionMode: ptr.To(batch.IndexedCompletion), + Parallelism: ptr.To[int32](3), + Completions: ptr.To[int32](3), + BackoffLimit: ptr.To[int32](math.MaxInt32), + BackoffLimitPerIndex: ptr.To[int32](3), + SuccessPolicy: &batch.SuccessPolicy{ + Rules: []batch.SuccessPolicyRule{{ + SucceededIndexes: ptr.To("0,1"), + SucceededCount: ptr.To[int32](1), + }}, + }, + }, + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + Reason: batch.JobReasonSuccessPolicy, + Message: "Matched rules at index 0", + }, + }, + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod, + *buildPod().uid("a2").index("1").phase(v1.PodRunning).trackingFinalizer().Pod, + *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod, + *buildPod().uid("c").index("2").phase(v1.PodRunning).trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 1, + Succeeded: 1, + Terminating: nil, + CompletedIndexes: "1", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ { - Type: batch.JobComplete, + Type: batch.JobSuccessCriteriaMet, Status: v1.ConditionTrue, Reason: batch.JobReasonSuccessPolicy, Message: "Matched rules at index 0", @@ -4339,12 +4684,6 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { Reason: batch.JobReasonPodFailurePolicy, Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", }, - { - Type: batch.JobFailed, - Status: v1.ConditionTrue, - Reason: batch.JobReasonPodFailurePolicy, - Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0", - }, }, }, }, @@ -4383,6 +4722,12 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { FailedIndexes: ptr.To("0"), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonFailedIndexes, + Message: "Job has failed indexes", + }, { Type: batch.JobFailed, Status: v1.ConditionTrue, @@ -4425,6 +4770,12 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonBackoffLimitExceeded, + Message: "Job has reached the specified backoff limit", + }, { Type: batch.JobFailed, Status: v1.ConditionTrue, @@ -4753,6 +5104,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(now) @@ -4821,12 +5173,15 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { testCases := map[string]struct { enableJobBackoffLimitPerIndex bool enableJobPodFailurePolicy bool + enableJobPodReplacementPolicy bool + enableJobManagedBy bool job batch.Job pods []v1.Pod wantStatus batch.JobStatus }{ "successful job after a single failure within index": { enableJobBackoffLimitPerIndex: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, @@ -4853,6 +5208,10 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { FailedIndexes: ptr.To(""), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + }, { Type: batch.JobComplete, Status: v1.ConditionTrue, @@ -4862,6 +5221,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { }, "single failed pod, not counted as the replacement pod creation is delayed": { enableJobBackoffLimitPerIndex: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, @@ -4887,6 +5247,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { }, "single failed pod replaced already": { enableJobBackoffLimitPerIndex: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, @@ -4914,6 +5275,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { }, "single failed index due to exceeding the backoff limit per index, the job continues": { enableJobBackoffLimitPerIndex: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, @@ -4941,6 +5303,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { "single failed index due to FailIndex action, the job continues": { enableJobBackoffLimitPerIndex: true, enableJobPodFailurePolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, @@ -4990,6 +5353,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { "job failed index due to FailJob action": { enableJobBackoffLimitPerIndex: true, enableJobPodFailurePolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, @@ -5054,6 +5418,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { "job pod failure ignored due to matching Ignore action": { enableJobBackoffLimitPerIndex: true, enableJobPodFailurePolicy: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, @@ -5103,6 +5468,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { }, "job failed due to exceeding backoffLimit before backoffLimitPerIndex": { enableJobBackoffLimitPerIndex: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, @@ -5127,6 +5493,12 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { Terminating: ptr.To[int32](0), UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonBackoffLimitExceeded, + Message: "Job has reached the specified backoff limit", + }, { Type: batch.JobFailed, Status: v1.ConditionTrue, @@ -5138,6 +5510,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { }, "job failed due to failed indexes": { enableJobBackoffLimitPerIndex: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, @@ -5163,6 +5536,12 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { CompletedIndexes: "1", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonFailedIndexes, + Message: "Job has failed indexes", + }, { Type: batch.JobFailed, Status: v1.ConditionTrue, @@ -5174,6 +5553,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { }, "job failed due to exceeding max failed indexes": { enableJobBackoffLimitPerIndex: true, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, @@ -5203,7 +5583,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Conditions: []batch.JobCondition{ { - Type: batch.JobFailed, + Type: batch.JobFailureTarget, Status: v1.ConditionTrue, Reason: batch.JobReasonMaxFailedIndexesExceeded, Message: "Job has exceeded the specified maximal number of failed indexes", @@ -5213,6 +5593,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { }, "job with finished indexes; failedIndexes are cleaned when JobBackoffLimitPerIndex disabled": { enableJobBackoffLimitPerIndex: false, + enableJobPodReplacementPolicy: true, job: batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: validObjectMeta, @@ -5241,11 +5622,167 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, + "job failed due to failed indexes; JobPodReplacementPolicy and JobManagedBy disabled": { + enableJobBackoffLimitPerIndex: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + BackoffLimit: ptr.To[int32](math.MaxInt32), + CompletionMode: ptr.To(batch.IndexedCompletion), + BackoffLimitPerIndex: ptr.To[int32](1), + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, + *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 1, + Succeeded: 1, + FailedIndexes: ptr.To("0"), + CompletedIndexes: "1", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailed, + Status: v1.ConditionTrue, + Reason: batch.JobReasonFailedIndexes, + Message: "Job has failed indexes", + }, + }, + }, + }, + "job failed due to failed indexes; JobManagedBy enabled": { + enableJobBackoffLimitPerIndex: true, + enableJobManagedBy: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + BackoffLimit: ptr.To[int32](math.MaxInt32), + CompletionMode: ptr.To(batch.IndexedCompletion), + BackoffLimitPerIndex: ptr.To[int32](1), + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, + *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 1, + Succeeded: 1, + FailedIndexes: ptr.To("0"), + CompletedIndexes: "1", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonFailedIndexes, + Message: "Job has failed indexes", + }, + { + Type: batch.JobFailed, + Status: v1.ConditionTrue, + Reason: batch.JobReasonFailedIndexes, + Message: "Job has failed indexes", + }, + }, + }, + }, + "job failed due to exceeding max failed indexes; JobPodReplacementPolicy and JobManagedBy disabled": { + enableJobBackoffLimitPerIndex: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: ptr.To[int32](4), + Completions: ptr.To[int32](4), + BackoffLimit: ptr.To[int32](math.MaxInt32), + CompletionMode: ptr.To(batch.IndexedCompletion), + BackoffLimitPerIndex: ptr.To[int32](1), + MaxFailedIndexes: ptr.To[int32](1), + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, + *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod, + *buildPod().uid("c").index("2").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, + *buildPod().uid("d").index("3").phase(v1.PodRunning).indexFailureCount("0").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 3, + Succeeded: 1, + FailedIndexes: ptr.To("0,2"), + CompletedIndexes: "1", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailed, + Status: v1.ConditionTrue, + Reason: batch.JobReasonMaxFailedIndexesExceeded, + Message: "Job has exceeded the specified maximal number of failed indexes", + }, + }, + }, + }, + "job failed due to exceeding max failed indexes; JobManagedBy enabled": { + enableJobBackoffLimitPerIndex: true, + enableJobManagedBy: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: ptr.To[int32](4), + Completions: ptr.To[int32](4), + BackoffLimit: ptr.To[int32](math.MaxInt32), + CompletionMode: ptr.To(batch.IndexedCompletion), + BackoffLimitPerIndex: ptr.To[int32](1), + MaxFailedIndexes: ptr.To[int32](1), + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, + *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod, + *buildPod().uid("c").index("2").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, + *buildPod().uid("d").index("3").phase(v1.PodRunning).indexFailureCount("0").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 3, + Succeeded: 1, + FailedIndexes: ptr.To("0,2"), + CompletedIndexes: "1", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonMaxFailedIndexesExceeded, + Message: "Job has exceeded the specified maximal number of failed indexes", + }, + }, + }, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(now) manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) @@ -6431,6 +6968,10 @@ func TestJobBackoffForOnFailure(t *testing.T) { expectedSucceeded int32 expectedFailed int32 expectedConditions []batch.JobCondition + + // features + enableJobManagedBy bool + enableJobPodReplacementPolicy bool }{ "backoffLimit 0 should have 1 pod active": { parallelism: 1, @@ -6618,10 +7159,100 @@ func TestJobBackoffForOnFailure(t *testing.T) { }, }, }, + "finished job; JobPodReplacementPolicy enabled": { + enableJobPodReplacementPolicy: true, + + parallelism: 2, + completions: 4, + backoffLimit: 6, + suspend: true, + restartCounts: []int32{1, 1, 2, 0}, + podPhase: v1.PodSucceeded, + expectedActive: 0, + expectedSucceeded: 4, + expectedFailed: 0, + expectedConditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + }, + { + Type: batch.JobComplete, + Status: v1.ConditionTrue, + }, + }, + }, + "finished job; JobManagedBy enabled": { + enableJobManagedBy: true, + + parallelism: 2, + completions: 4, + backoffLimit: 6, + suspend: true, + restartCounts: []int32{1, 1, 2, 0}, + podPhase: v1.PodSucceeded, + expectedActive: 0, + expectedSucceeded: 4, + expectedFailed: 0, + expectedConditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + }, + { + Type: batch.JobComplete, + Status: v1.ConditionTrue, + }, + }, + }, + "too many job failures with podRunning - multiple pods; JobPodReplacementPolicy enabled": { + enableJobPodReplacementPolicy: true, + + parallelism: 2, + completions: 5, + backoffLimit: 2, + suspend: false, + restartCounts: []int32{1, 1}, + podPhase: v1.PodRunning, + expectedActive: 0, + expectedSucceeded: 0, + expectedFailed: 2, + expectedConditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonBackoffLimitExceeded, + Message: "Job has reached the specified backoff limit", + }, + }, + }, + "too many job failures with podRunning - multiple pods; JobManagedBy enabled": { + enableJobManagedBy: true, + + parallelism: 2, + completions: 5, + backoffLimit: 2, + suspend: false, + restartCounts: []int32{1, 1}, + podPhase: v1.PodRunning, + expectedActive: 0, + expectedSucceeded: 0, + expectedFailed: 2, + expectedConditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonBackoffLimitExceeded, + Message: "Job has reached the specified backoff limit", + }, + }, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy) // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) @@ -6723,6 +7354,12 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { expectedSucceeded: 0, expectedFailed: 2, expectedConditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonBackoffLimitExceeded, + Message: "Job has reached the specified backoff limit", + }, { Type: batch.JobFailed, Status: v1.ConditionTrue, @@ -6754,6 +7391,12 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { expectedSucceeded: 0, expectedFailed: 7, expectedConditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batch.JobReasonBackoffLimitExceeded, + Message: "Job has reached the specified backoff limit", + }, { Type: batch.JobFailed, Status: v1.ConditionTrue, diff --git a/pkg/controller/job/success_policy.go b/pkg/controller/job/success_policy.go index c70c1d3289d..d9319f37804 100644 --- a/pkg/controller/job/success_policy.go +++ b/pkg/controller/job/success_policy.go @@ -50,7 +50,7 @@ func matchSuccessPolicy(logger klog.Logger, successPolicy *batch.SuccessPolicy, } func hasSuccessCriteriaMetCondition(job *batch.Job) *batch.JobCondition { - if feature.DefaultFeatureGate.Enabled(features.JobSuccessPolicy) { + if feature.DefaultFeatureGate.Enabled(features.JobSuccessPolicy) || delayTerminalCondition() { successCriteriaMet := findConditionByType(job.Status.Conditions, batch.JobSuccessCriteriaMet) if successCriteriaMet != nil && successCriteriaMet.Status == v1.ConditionTrue { return successCriteriaMet @@ -60,7 +60,7 @@ func hasSuccessCriteriaMetCondition(job *batch.Job) *batch.JobCondition { } func isSuccessCriteriaMetCondition(cond *batch.JobCondition) bool { - return feature.DefaultFeatureGate.Enabled(features.JobSuccessPolicy) && + return (feature.DefaultFeatureGate.Enabled(features.JobSuccessPolicy) || delayTerminalCondition()) && cond != nil && cond.Type == batch.JobSuccessCriteriaMet && cond.Status == v1.ConditionTrue } diff --git a/pkg/registry/batch/job/strategy.go b/pkg/registry/batch/job/strategy.go index e39679a545a..333c98b45b6 100644 --- a/pkg/registry/batch/job/strategy.go +++ b/pkg/registry/batch/job/strategy.go @@ -376,12 +376,15 @@ func getStatusValidationOptions(newJob, oldJob *batch.Job) batchvalidation.JobSt isJobCompleteChanged := batchvalidation.IsJobComplete(oldJob) != batchvalidation.IsJobComplete(newJob) isJobFailedChanged := batchvalidation.IsJobFailed(oldJob) != batchvalidation.IsJobFailed(newJob) isJobFailureTargetChanged := batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobFailureTarget) != batchvalidation.IsConditionTrue(newJob.Status.Conditions, batch.JobFailureTarget) + isJobSuccessCriteriaMetChanged := batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuccessCriteriaMet) != batchvalidation.IsConditionTrue(newJob.Status.Conditions, batch.JobSuccessCriteriaMet) isCompletedIndexesChanged := oldJob.Status.CompletedIndexes != newJob.Status.CompletedIndexes isFailedIndexesChanged := !ptr.Equal(oldJob.Status.FailedIndexes, newJob.Status.FailedIndexes) isActiveChanged := oldJob.Status.Active != newJob.Status.Active isStartTimeChanged := !ptr.Equal(oldJob.Status.StartTime, newJob.Status.StartTime) isCompletionTimeChanged := !ptr.Equal(oldJob.Status.CompletionTime, newJob.Status.CompletionTime) isUncountedTerminatedPodsChanged := !apiequality.Semantic.DeepEqual(oldJob.Status.UncountedTerminatedPods, newJob.Status.UncountedTerminatedPods) + isReadyChanged := !ptr.Equal(oldJob.Status.Ready, newJob.Status.Ready) + isTerminatingChanged := !ptr.Equal(oldJob.Status.Terminating, newJob.Status.Terminating) return batchvalidation.JobStatusValidationOptions{ // We allow to decrease the counter for succeeded pods for jobs which @@ -394,6 +397,8 @@ func getStatusValidationOptions(newJob, oldJob *batch.Job) batchvalidation.JobSt RejectCompletedIndexesForNonIndexedJob: isCompletedIndexesChanged, RejectFailedIndexesForNoBackoffLimitPerIndex: isFailedIndexesChanged, RejectFailedIndexesOverlappingCompleted: isFailedIndexesChanged || isCompletedIndexesChanged, + RejectFailedJobWithoutFailureTarget: isJobFailedChanged || isFailedIndexesChanged, + RejectCompleteJobWithoutSuccessCriteriaMet: isJobCompleteChanged || isJobSuccessCriteriaMetChanged, RejectFinishedJobWithActivePods: isJobFinishedChanged || isActiveChanged, RejectFinishedJobWithoutStartTime: isJobFinishedChanged || isStartTimeChanged, RejectFinishedJobWithUncountedTerminatedPods: isJobFinishedChanged || isUncountedTerminatedPodsChanged, @@ -404,9 +409,19 @@ func getStatusValidationOptions(newJob, oldJob *batch.Job) batchvalidation.JobSt RejectCompleteJobWithoutCompletionTime: isJobCompleteChanged || isCompletionTimeChanged, RejectCompleteJobWithFailedCondition: isJobCompleteChanged || isJobFailedChanged, RejectCompleteJobWithFailureTargetCondition: isJobCompleteChanged || isJobFailureTargetChanged, + AllowForSuccessCriteriaMetInExtendedScope: true, + RejectMoreReadyThanActivePods: isReadyChanged || isActiveChanged, + RejectFinishedJobWithTerminatingPods: isJobFinishedChanged || isTerminatingChanged, } } - return batchvalidation.JobStatusValidationOptions{} + if utilfeature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + return batchvalidation.JobStatusValidationOptions{ + AllowForSuccessCriteriaMetInExtendedScope: true, + } + } + return batchvalidation.JobStatusValidationOptions{ + AllowForSuccessCriteriaMetInExtendedScope: batchvalidation.IsConditionTrue(oldJob.Status.Conditions, batch.JobSuccessCriteriaMet), + } } // WarningsOnUpdate returns warnings for the given update. diff --git a/pkg/registry/batch/job/strategy_test.go b/pkg/registry/batch/job/strategy_test.go index 15fd8441ec6..9d87531ddc2 100644 --- a/pkg/registry/batch/job/strategy_test.go +++ b/pkg/registry/batch/job/strategy_test.go @@ -2063,8 +2063,9 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { nowPlusMinute := metav1.Time{Time: now.Add(time.Minute)} cases := map[string]struct { - enableJobManagedBy bool - enableJobSuccessPolicy bool + enableJobManagedBy bool + enableJobSuccessPolicy bool + enableJobPodReplacementPolicy bool job *batch.Job newJob *batch.Job @@ -2154,6 +2155,51 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { }, }, }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.conditions"}, + {Type: field.ErrorTypeInvalid, Field: "status.conditions"}, + {Type: field.ErrorTypeInvalid, Field: "status.conditions"}, + }, + }, + "invalid addition of Complete=True without SuccessCriteriaMet=True": { + enableJobManagedBy: true, + job: &batch.Job{ + ObjectMeta: validObjectMeta, + }, + newJob: &batch.Job{ + ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + StartTime: &now, + CompletionTime: &now, + Conditions: []batch.JobCondition{ + { + Type: batch.JobComplete, + Status: api.ConditionTrue, + }, + }, + }, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.conditions"}, + }, + }, + "invalid addition of Failed=True without FailureTarget=True": { + enableJobManagedBy: true, + job: &batch.Job{ + ObjectMeta: validObjectMeta, + }, + newJob: &batch.Job{ + ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + StartTime: &now, + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailed, + Status: api.ConditionTrue, + }, + }, + }, + }, wantErrs: field.ErrorList{ {Type: field.ErrorTypeInvalid, Field: "status.conditions"}, }, @@ -2178,11 +2224,23 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { enableJobManagedBy: true, job: &batch.Job{ ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: api.ConditionTrue, + }, + }, + }, }, newJob: &batch.Job{ ObjectMeta: validObjectMeta, Status: batch.JobStatus{ Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: api.ConditionTrue, + }, { Type: batch.JobFailed, Status: api.ConditionTrue, @@ -2198,12 +2256,24 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { enableJobManagedBy: true, job: &batch.Job{ ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }, + }, + }, }, newJob: &batch.Job{ ObjectMeta: validObjectMeta, Status: batch.JobStatus{ CompletionTime: &now, Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }, { Type: batch.JobComplete, Status: api.ConditionTrue, @@ -2219,6 +2289,16 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { enableJobManagedBy: true, job: &batch.Job{ ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + StartTime: &now, + Active: 1, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }, + }, + }, }, newJob: &batch.Job{ ObjectMeta: validObjectMeta, @@ -2227,6 +2307,10 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { CompletionTime: &now, Active: 1, Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }, { Type: batch.JobComplete, Status: api.ConditionTrue, @@ -2238,30 +2322,94 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { {Type: field.ErrorTypeInvalid, Field: "status.active"}, }, }, - "transition to Failed condition with terminating>0 and ready>0": { + "invalid attempt to transition to Failed=True with terminating > 0": { enableJobManagedBy: true, job: &batch.Job{ ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + StartTime: &now, + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: api.ConditionTrue, + }, + }, + Terminating: ptr.To[int32](1), + }, }, newJob: &batch.Job{ ObjectMeta: validObjectMeta, Status: batch.JobStatus{ StartTime: &now, Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: api.ConditionTrue, + }, { Type: batch.JobFailed, Status: api.ConditionTrue, }, }, Terminating: ptr.To[int32](1), - Ready: ptr.To[int32](1), }, }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.terminating"}, + }, + }, + "invalid attempt to transition to Failed=True with active > 0": { + enableJobManagedBy: true, + job: &batch.Job{ + ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + StartTime: &now, + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: api.ConditionTrue, + }, + }, + Active: 1, + }, + }, + newJob: &batch.Job{ + ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + StartTime: &now, + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: api.ConditionTrue, + }, + { + Type: batch.JobFailed, + Status: api.ConditionTrue, + }, + }, + Active: 1, + }, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.active"}, + }, }, "invalid attempt to transition to Failed=True with uncountedTerminatedPods.Failed>0": { enableJobManagedBy: true, job: &batch.Job{ ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + StartTime: &now, + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Failed: []types.UID{"a"}, + }, + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: api.ConditionTrue, + }, + }, + }, }, newJob: &batch.Job{ ObjectMeta: validObjectMeta, @@ -2271,6 +2419,10 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { Failed: []types.UID{"a"}, }, Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: api.ConditionTrue, + }, { Type: batch.JobFailed, Status: api.ConditionTrue, @@ -2363,6 +2515,18 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { enableJobManagedBy: true, job: &batch.Job{ ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + StartTime: &now, + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Succeeded: []types.UID{"a"}, + }, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }, + }, + }, }, newJob: &batch.Job{ ObjectMeta: validObjectMeta, @@ -2373,6 +2537,10 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { Succeeded: []types.UID{"a"}, }, Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }, { Type: batch.JobComplete, Status: api.ConditionTrue, @@ -2388,12 +2556,25 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { enableJobManagedBy: true, job: &batch.Job{ ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + StartTime: &now, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }, + }, + }, }, newJob: &batch.Job{ ObjectMeta: validObjectMeta, Status: batch.JobStatus{ StartTime: &now, Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }, { Type: batch.JobComplete, Status: api.ConditionTrue, @@ -2499,6 +2680,12 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { ObjectMeta: validObjectMeta, Status: batch.JobStatus{ StartTime: &nowPlusMinute, + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }, + }, }, }, newJob: &batch.Job{ @@ -2507,6 +2694,10 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { StartTime: &nowPlusMinute, CompletionTime: &now, Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }, { Type: batch.JobComplete, Status: api.ConditionTrue, @@ -2941,6 +3132,7 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { }, wantErrs: field.ErrorList{ {Type: field.ErrorTypeInvalid, Field: "status.conditions"}, + {Type: field.ErrorTypeInvalid, Field: "status.conditions"}, }, }, "invalid failedIndexes, which overlap with completedIndexes": { @@ -3021,6 +3213,37 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { {Type: field.ErrorTypeInvalid, Field: "status.conditions"}, }, }, + "valid update of Job if SuccessCriteriaMet already present for NonIndexed Jobs; JobSuccessPolicy enabled, while JobManagedBy and JobPodReplacementPolicy disabled": { + enableJobSuccessPolicy: true, + enableJobManagedBy: false, + enableJobPodReplacementPolicy: false, + job: &batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{}, + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{{ + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }}, + }, + }, + newJob: &batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{}, + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{ + { + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }, + { + Type: batch.JobComplete, + Status: api.ConditionTrue, + }, + }, + }, + }, + }, "invalid addition of SuccessCriteriaMet for Job with Failed": { enableJobSuccessPolicy: true, job: &batch.Job{ @@ -3378,11 +3601,64 @@ func TestStatusStrategy_ValidateUpdate(t *testing.T) { {Type: field.ErrorTypeInvalid, Field: "status.conditions"}, }, }, + "valid addition of SuccessCriteriaMet when JobManagedBy is enabled": { + enableJobManagedBy: true, + job: &batch.Job{ + ObjectMeta: validObjectMeta, + }, + newJob: &batch.Job{ + ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{{ + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }}, + }, + }, + }, + "valid addition of SuccessCriteriaMet when JobPodReplacementPolicy is enabled": { + enableJobPodReplacementPolicy: true, + job: &batch.Job{ + ObjectMeta: validObjectMeta, + }, + newJob: &batch.Job{ + ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{{ + Type: batch.JobSuccessCriteriaMet, + Status: api.ConditionTrue, + }}, + }, + }, + }, + "invalid attempt to set more ready pods than active": { + enableJobManagedBy: true, + job: &batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Completions: ptr.To[int32](5), + }, + }, + newJob: &batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Completions: ptr.To[int32](5), + }, + Status: batch.JobStatus{ + Active: 1, + Ready: ptr.To[int32](2), + }, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.ready"}, + }, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy) errs := StatusStrategy.ValidateUpdate(ctx, tc.newJob, tc.job) if diff := cmp.Diff(tc.wantErrs, errs, ignoreErrValueDetail); diff != "" { diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index ea6abb968d6..5f404b0e30d 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "strconv" - "time" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" @@ -35,9 +34,7 @@ import ( "k8s.io/apimachinery/pkg/types" utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" watchtools "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/retry" @@ -705,9 +702,20 @@ done`} job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) + ginkgo.By("Awaiting for the job to have the interim success condition") + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobSuccessCriteriaMet, "") + framework.ExpectNoError(err, "failed to ensure job has the interim success condition: %s", f.Namespace.Name) + ginkgo.By("Ensuring job reaches completions") err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, *job.Spec.Completions) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) + + ginkgo.By("Verifying the Job status fields to ensure correct final state") + job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err, "failed to retrieve latest job object") + gomega.Expect(job.Status.Active).Should(gomega.Equal(int32(0))) + gomega.Expect(job.Status.Ready).Should(gomega.Equal(ptr.To[int32](0))) + gomega.Expect(job.Status.Terminating).Should(gomega.Equal(ptr.To[int32](0))) }) ginkgo.It("should fail when exceeds active deadline", func(ctx context.Context) { @@ -720,9 +728,21 @@ done`} job := e2ejob.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds, backoffLimit) job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Awaiting for the job to have the interim failure condition") + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, batchv1.JobReasonDeadlineExceeded) + framework.ExpectNoError(err, "failed to ensure job has the interim failure condition: %s", f.Namespace.Name) + ginkgo.By("Ensuring job past active deadline") - err = waitForJobFailure(ctx, f.ClientSet, f.Namespace.Name, job.Name, time.Duration(activeDeadlineSeconds+15)*time.Second, "DeadlineExceeded") + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailed, batchv1.JobReasonDeadlineExceeded) framework.ExpectNoError(err, "failed to ensure job past active deadline in namespace: %s", f.Namespace.Name) + + ginkgo.By("Verifying the Job status fields to ensure correct final state") + job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err, "failed to retrieve latest job object") + gomega.Expect(job.Status.Active).Should(gomega.Equal(int32(0))) + gomega.Expect(job.Status.Ready).Should(gomega.Equal(ptr.To[int32](0))) + gomega.Expect(job.Status.Terminating).Should(gomega.Equal(ptr.To[int32](0))) }) /* @@ -823,9 +843,13 @@ done`} job := e2ejob.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, int32(backoff)) job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) - ginkgo.By("Ensuring job exceed backofflimit") - err = waitForJobFailure(ctx, f.ClientSet, f.Namespace.Name, job.Name, e2ejob.JobTimeout, "BackoffLimitExceeded") + ginkgo.By("Awaiting for the job to have the interim failure condition") + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailureTarget, batchv1.JobReasonBackoffLimitExceeded) + framework.ExpectNoError(err, "failed to ensure job has the interim failure condition: %s", f.Namespace.Name) + + ginkgo.By("Ensuring job exceed backofflimit") + err = e2ejob.WaitForJobCondition(ctx, f.ClientSet, f.Namespace.Name, job.Name, batchv1.JobFailed, batchv1.JobReasonBackoffLimitExceeded) framework.ExpectNoError(err, "failed to ensure job exceed backofflimit in namespace: %s", f.Namespace.Name) ginkgo.By(fmt.Sprintf("Checking that %d pod created and status is failed", backoff+1)) @@ -835,6 +859,13 @@ done`} for _, pod := range pods.Items { gomega.Expect(pod.Status.Phase).To(gomega.Equal(v1.PodFailed)) } + + ginkgo.By("Verifying the Job status fields to ensure correct final state") + job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err, "failed to retrieve latest job object") + gomega.Expect(job.Status.Active).Should(gomega.Equal(int32(0))) + gomega.Expect(job.Status.Ready).Should(gomega.Equal(ptr.To[int32](0))) + gomega.Expect(job.Status.Terminating).Should(gomega.Equal(ptr.To[int32](0))) }) f.It("should run a job to completion with CPU requests", f.WithSerial(), func(ctx context.Context) { @@ -1186,24 +1217,6 @@ func waitForJobEvent(ctx context.Context, config watchEventConfig) { } } -// waitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail. -func waitForJobFailure(ctx context.Context, c clientset.Interface, ns, jobName string, timeout time.Duration, reason string) error { - return wait.Poll(framework.Poll, timeout, func() (bool, error) { - curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{}) - if err != nil { - return false, err - } - for _, c := range curr.Status.Conditions { - if c.Type == batchv1.JobFailed && c.Status == v1.ConditionTrue { - if reason == "" || reason == c.Reason { - return true, nil - } - } - } - return false, nil - }) -} - func findConditionByType(list []batchv1.JobCondition, cType batchv1.JobConditionType) *batchv1.JobCondition { for i := range list { if list[i].Type == cType { diff --git a/test/e2e/framework/job/wait.go b/test/e2e/framework/job/wait.go index 40e2709fdaa..2c994bbf0e7 100644 --- a/test/e2e/framework/job/wait.go +++ b/test/e2e/framework/job/wait.go @@ -69,13 +69,16 @@ func waitForJobPodsInPhase(ctx context.Context, c clientset.Interface, ns, jobNa // WaitForJobComplete uses c to wait for completions to complete for the Job jobName in namespace ns. func WaitForJobComplete(ctx context.Context, c clientset.Interface, ns, jobName string, completions int32) error { - return wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, false, func(ctx context.Context) (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, false, func(ctx context.Context) (bool, error) { curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{}) if err != nil { return false, err } return curr.Status.Succeeded == completions, nil - }) + }); err != nil { + return nil + } + return WaitForJobCondition(ctx, c, ns, jobName, batchv1.JobComplete, "") } // WaitForJobReady waits for particular value of the Job .status.ready field @@ -112,6 +115,28 @@ func WaitForJobFailed(c clientset.Interface, ns, jobName string) error { }) } +// waitForJobCondition waits for the specified Job to have the expected condition with the specific reason. +func WaitForJobCondition(ctx context.Context, c clientset.Interface, ns, jobName string, cType batchv1.JobConditionType, reason string) error { + err := wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, false, func(ctx context.Context) (bool, error) { + curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{}) + if err != nil { + return false, err + } + for _, c := range curr.Status.Conditions { + if c.Type == cType && c.Status == v1.ConditionTrue { + if reason == c.Reason { + return true, nil + } + } + } + return false, nil + }) + if err != nil { + return fmt.Errorf("waiting for Job %q to have the condition %q with reason: %q: %w", jobName, cType, reason, err) + } + return nil +} + func isJobFailed(j *batchv1.Job) bool { for _, c := range j.Status.Conditions { if (c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue { diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 2efafd86b07..e924030d713 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" eventsv1 "k8s.io/api/events/v1" @@ -1160,6 +1161,301 @@ func TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff(t *testing.T) } } +// TestDelayTerminalPhaseCondition tests the fix for Job controller to delay +// setting the terminal phase conditions (Failed and Complete) until all Pods +// are terminal. The fate of the Job is indicated by the interim Job conditions: +// FailureTarget, or SuccessCriteriaMet. +func TestDelayTerminalPhaseCondition(t *testing.T) { + t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) + + podTemplateSpec := v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{"fake.example.com/blockDeletion"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "main-container", + Image: "foo", + ImagePullPolicy: v1.PullIfNotPresent, + TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError, + }, + }, + }, + } + failOnePod := func(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job) { + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodFailed, err) + } + } + succeedOnePodAndScaleDown := func(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job) { + // mark one pod as succeeded + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 0); err != nil { + t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err) + } + jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace) + if _, err := updateJob(ctx, jobClient, jobObj.Name, func(j *batchv1.Job) { + j.Spec.Parallelism = ptr.To[int32](1) + j.Spec.Completions = ptr.To[int32](1) + }); err != nil { + t.Fatalf("Unexpected error when scaling down the job: %v", err) + } + } + + testCases := map[string]struct { + enableJobManagedBy bool + enableJobPodReplacementPolicy bool + + job batchv1.Job + action func(context.Context, clientset.Interface, *batchv1.Job) + wantInterimStatus *batchv1.JobStatus + wantTerminalStatus batchv1.JobStatus + }{ + "job backoff limit exceeded; JobPodReplacementPolicy and JobManagedBy disabled": { + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + Template: podTemplateSpec, + BackoffLimit: ptr.To[int32](0), + }, + }, + action: failOnePod, + wantTerminalStatus: batchv1.JobStatus{ + Failed: 2, + Ready: ptr.To[int32](0), + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobFailed, + Status: v1.ConditionTrue, + Reason: batchv1.JobReasonBackoffLimitExceeded, + }, + }, + }, + }, + "job backoff limit exceeded; JobPodReplacementPolicy enabled": { + enableJobPodReplacementPolicy: true, + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + Template: podTemplateSpec, + BackoffLimit: ptr.To[int32](0), + }, + }, + action: failOnePod, + wantInterimStatus: &batchv1.JobStatus{ + Failed: 2, + Ready: ptr.To[int32](0), + Terminating: ptr.To[int32](1), + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batchv1.JobReasonBackoffLimitExceeded, + }, + }, + }, + wantTerminalStatus: batchv1.JobStatus{ + Failed: 2, + Ready: ptr.To[int32](0), + Terminating: ptr.To[int32](0), + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batchv1.JobReasonBackoffLimitExceeded, + }, + { + Type: batchv1.JobFailed, + Status: v1.ConditionTrue, + Reason: batchv1.JobReasonBackoffLimitExceeded, + }, + }, + }, + }, + "job backoff limit exceeded; JobManagedBy enabled": { + enableJobManagedBy: true, + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + Template: podTemplateSpec, + BackoffLimit: ptr.To[int32](0), + }, + }, + action: failOnePod, + wantInterimStatus: &batchv1.JobStatus{ + Failed: 2, + Ready: ptr.To[int32](0), + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batchv1.JobReasonBackoffLimitExceeded, + }, + }, + }, + wantTerminalStatus: batchv1.JobStatus{ + Failed: 2, + Ready: ptr.To[int32](0), + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: batchv1.JobReasonBackoffLimitExceeded, + }, + { + Type: batchv1.JobFailed, + Status: v1.ConditionTrue, + Reason: batchv1.JobReasonBackoffLimitExceeded, + }, + }, + }, + }, + "job scale down to meet completions; JobPodReplacementPolicy and JobManagedBy disabled": { + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + CompletionMode: ptr.To(batchv1.IndexedCompletion), + Template: podTemplateSpec, + }, + }, + action: succeedOnePodAndScaleDown, + wantTerminalStatus: batchv1.JobStatus{ + Succeeded: 1, + Ready: ptr.To[int32](0), + CompletedIndexes: "0", + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobComplete, + Status: v1.ConditionTrue, + }, + }, + }, + }, + "job scale down to meet completions; JobPodReplacementPolicy enabled": { + enableJobPodReplacementPolicy: true, + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + CompletionMode: ptr.To(batchv1.IndexedCompletion), + Template: podTemplateSpec, + }, + }, + action: succeedOnePodAndScaleDown, + wantInterimStatus: &batchv1.JobStatus{ + Succeeded: 1, + Ready: ptr.To[int32](0), + Terminating: ptr.To[int32](1), + CompletedIndexes: "0", + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + }, + }, + }, + wantTerminalStatus: batchv1.JobStatus{ + Succeeded: 1, + Ready: ptr.To[int32](0), + Terminating: ptr.To[int32](0), + CompletedIndexes: "0", + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + }, + { + Type: batchv1.JobComplete, + Status: v1.ConditionTrue, + }, + }, + }, + }, + "job scale down to meet completions; JobManagedBy enabled": { + enableJobManagedBy: true, + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + CompletionMode: ptr.To(batchv1.IndexedCompletion), + Template: podTemplateSpec, + }, + }, + action: succeedOnePodAndScaleDown, + wantInterimStatus: &batchv1.JobStatus{ + Succeeded: 1, + Ready: ptr.To[int32](0), + CompletedIndexes: "0", + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + }, + }, + }, + wantTerminalStatus: batchv1.JobStatus{ + Succeeded: 1, + Ready: ptr.To[int32](0), + CompletedIndexes: "0", + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobSuccessCriteriaMet, + Status: v1.ConditionTrue, + }, + { + Type: batchv1.JobComplete, + Status: v1.ConditionTrue, + }, + }, + }, + }, + } + for name, test := range testCases { + t.Run(name, func(t *testing.T) { + resetMetrics() + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, test.enableJobPodReplacementPolicy) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, test.enableJobManagedBy) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, true) + + closeFn, restConfig, clientSet, ns := setup(t, "delay-terminal-condition") + t.Cleanup(closeFn) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) + t.Cleanup(cancel) + + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job) + if err != nil { + t.Fatalf("Error %q while creating the job %q", err, jobObj.Name) + } + t.Cleanup(func() { removePodsFinalizer(ctx, t, clientSet, ns.Name) }) + jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace) + + waitForPodsToBeActive(ctx, t, jobClient, *jobObj.Spec.Parallelism, jobObj) + + test.action(ctx, clientSet, jobObj) + if test.wantInterimStatus != nil { + validateJobStatus(ctx, t, clientSet, jobObj, *test.wantInterimStatus) + + // Set terminal phase to all the remaining pods to simulate + // Kubelet (or other components like PodGC). + jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(s v1.PodStatus) bool { + return (s.Phase == v1.PodPending || s.Phase == v1.PodRunning) + }) + if err != nil { + t.Fatalf("Failed to list Job Pods: %v", err) + } + if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, len(jobPods)); err != nil { + t.Fatalf("Failed setting phase %q on Job Pod: %v", v1.PodSucceeded, err) + } + } + validateJobStatus(ctx, t, clientSet, jobObj, test.wantTerminalStatus) + }) + } +} + // TestBackoffLimitPerIndex tests handling of job and its pods when // backoff limit per index is used. func TestBackoffLimitPerIndex(t *testing.T) { @@ -1966,16 +2262,14 @@ func TestManagedBy_UsingReservedJobFinalizers(t *testing.T) { t.Fatalf("Error %v when marking the %q pod as succeeded", err, klog.KObj(podObj)) } - // Mark the job as finished so that the built-in controller receives the + // Trigger termination for the Job so that the built-in controller receives the // UpdateJob event in reaction to each it would remove the pod's finalizer, // if not for the custom managedBy field. jobObj.Status.Conditions = append(jobObj.Status.Conditions, batchv1.JobCondition{ - Type: batchv1.JobComplete, + Type: batchv1.JobSuccessCriteriaMet, Status: v1.ConditionTrue, }) jobObj.Status.StartTime = ptr.To(metav1.Now()) - jobObj.Status.CompletionTime = ptr.To(metav1.Now()) - if jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).UpdateStatus(ctx, jobObj, metav1.UpdateOptions{}); err != nil { t.Fatalf("Error %v when updating the job as finished %v", err, klog.KObj(jobObj)) } @@ -2821,7 +3115,7 @@ func TestElasticIndexedJob(t *testing.T) { jobUpdates: []jobUpdate{ { completions: ptr.To[int32](0), - wantTerminating: ptr.To[int32](3), + wantTerminating: ptr.To[int32](0), }, }, }, @@ -3595,6 +3889,25 @@ func validateJobsPodsStatusOnlyWithTimeout(ctx context.Context, t testing.TB, cl } } +func validateJobStatus(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, wantStatus batchv1.JobStatus) { + t.Helper() + diff := "" + if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { + gotJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get updated Job: %v, last status diff (-want,+got):\n%s", err, diff) + } + diff = cmp.Diff(wantStatus, gotJob.Status, + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(batchv1.JobStatus{}, "StartTime", "UncountedTerminatedPods", "CompletionTime"), + cmpopts.IgnoreFields(batchv1.JobCondition{}, "LastProbeTime", "LastTransitionTime", "Message"), + ) + return diff == "", nil + }); err != nil { + t.Fatalf("Waiting for Job Status: %v\n, Status diff (-want,+got):\n%s", err, diff) + } +} + func validateJobPodsStatus(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) { t.Helper() validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired)