From a15c27661e68695be81d018a1fb81683881f4266 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Wo=C5=BAniak?= Date: Tue, 18 Jul 2023 22:44:11 +0200 Subject: [PATCH] Job controller implementation of backoff limit per index (#118009) --- pkg/controller/job/backoff_utils.go | 26 +- pkg/controller/job/backoff_utils_test.go | 44 ++ pkg/controller/job/indexed_job_utils.go | 204 ++++- pkg/controller/job/indexed_job_utils_test.go | 483 ++++++++++++ pkg/controller/job/job_controller.go | 110 ++- pkg/controller/job/job_controller_test.go | 673 +++++++++++++++- pkg/controller/job/pod_failure_policy.go | 20 +- pkg/controller/job/pod_failure_policy_test.go | 138 +++- test/integration/job/job_test.go | 720 +++++++++++++++++- 9 files changed, 2345 insertions(+), 73 deletions(-) diff --git a/pkg/controller/job/backoff_utils.go b/pkg/controller/job/backoff_utils.go index ce68468f1d0..2e89e5a1ab3 100644 --- a/pkg/controller/job/backoff_utils.go +++ b/pkg/controller/job/backoff_utils.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/utils/clock" "k8s.io/utils/pointer" @@ -213,12 +214,31 @@ func getFinishTimeFromDeletionTimestamp(p *v1.Pod) *time.Time { } func (backoff backoffRecord) getRemainingTime(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration) time.Duration { - if backoff.failuresAfterLastSuccess == 0 { + return getRemainingTimeForFailuresCount(clock, defaultBackoff, maxBackoff, backoff.failuresAfterLastSuccess, backoff.lastFailureTime) +} + +// getRemainingTimePerIndex returns the remaining time left for a given index to +// create the replacement pods. The number of consecutive pod failures for the +// index is retrieved from the `job-index-failure-count` annotation of the +// last failed pod within the index (represented by `lastFailedPod`). +// The last failed pod is also used to determine the time of the last failure. +func getRemainingTimePerIndex(logger klog.Logger, clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration, lastFailedPod *v1.Pod) time.Duration { + if lastFailedPod == nil { + // There is no previous failed pod for this index + return time.Duration(0) + } + failureCount := getIndexAbsoluteFailureCount(logger, lastFailedPod) + 1 + lastFailureTime := getFinishedTime(lastFailedPod) + return getRemainingTimeForFailuresCount(clock, defaultBackoff, maxBackoff, failureCount, &lastFailureTime) +} + +func getRemainingTimeForFailuresCount(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration, failuresCount int32, lastFailureTime *time.Time) time.Duration { + if failuresCount == 0 { return 0 } backoffDuration := defaultBackoff - for i := 1; i < int(backoff.failuresAfterLastSuccess); i++ { + for i := 1; i < int(failuresCount); i++ { backoffDuration = backoffDuration * 2 if backoffDuration >= maxBackoff { backoffDuration = maxBackoff @@ -226,7 +246,7 @@ func (backoff backoffRecord) getRemainingTime(clock clock.WithTicker, defaultBac } } - timeElapsedSinceLastFailure := clock.Since(*backoff.lastFailureTime) + timeElapsedSinceLastFailure := clock.Since(*lastFailureTime) if backoffDuration < timeElapsedSinceLastFailure { return 0 diff --git a/pkg/controller/job/backoff_utils_test.go b/pkg/controller/job/backoff_utils_test.go index 68077c44f3d..f94d51efad9 100644 --- a/pkg/controller/job/backoff_utils_test.go +++ b/pkg/controller/job/backoff_utils_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2/ktesting" clocktesting "k8s.io/utils/clock/testing" "k8s.io/utils/pointer" ) @@ -466,3 +467,46 @@ func TestGetRemainingBackoffTime(t *testing.T) { }) } } + +func TestGetRemainingBackoffTimePerIndex(t *testing.T) { + defaultTestTime := metav1.NewTime(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)) + testCases := map[string]struct { + currentTime time.Time + maxBackoff time.Duration + defaultBackoff time.Duration + lastFailedPod *v1.Pod + wantDuration time.Duration + }{ + "no failures": { + lastFailedPod: nil, + defaultBackoff: 5 * time.Second, + maxBackoff: 700 * time.Second, + wantDuration: 0 * time.Second, + }, + "two prev failures; current time and failure time are same": { + lastFailedPod: buildPod().phase(v1.PodFailed).indexFailureCount("2").customDeletionTimestamp(defaultTestTime.Time).Pod, + currentTime: defaultTestTime.Time, + defaultBackoff: 5 * time.Second, + maxBackoff: 700 * time.Second, + wantDuration: 20 * time.Second, + }, + "one prev failure counted and one ignored; current time and failure time are same": { + lastFailedPod: buildPod().phase(v1.PodFailed).indexFailureCount("1").indexIgnoredFailureCount("1").customDeletionTimestamp(defaultTestTime.Time).Pod, + currentTime: defaultTestTime.Time, + defaultBackoff: 5 * time.Second, + maxBackoff: 700 * time.Second, + wantDuration: 20 * time.Second, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + fakeClock := clocktesting.NewFakeClock(tc.currentTime.Truncate(time.Second)) + d := getRemainingTimePerIndex(logger, fakeClock, tc.defaultBackoff, tc.maxBackoff, tc.lastFailedPod) + if d.Seconds() != tc.wantDuration.Seconds() { + t.Errorf("Expected value of duration %v; got %v", tc.wantDuration, d) + } + }) + } +} diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index 49f42b18e61..8d5bc8bad2d 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -18,6 +18,7 @@ package job import ( "fmt" + "math" "sort" "strconv" "strings" @@ -41,6 +42,10 @@ func isIndexedJob(job *batch.Job) bool { return job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion } +func hasBackoffLimitPerIndex(job *batch.Job) bool { + return feature.DefaultFeatureGate.Enabled(features.JobBackoffLimitPerIndex) && job.Spec.BackoffLimitPerIndex != nil +} + type interval struct { First int Last int @@ -54,7 +59,7 @@ type orderedIntervals []interval // empty list if this Job is not tracked with finalizers. The new list includes // the indexes that succeeded since the last sync. func calculateSucceededIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) { - prevIntervals := succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) + prevIntervals := parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) newSucceeded := sets.New[int]() for _, p := range pods { ix := getCompletionIndex(p.Annotations) @@ -69,9 +74,55 @@ func calculateSucceededIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Po return prevIntervals, result } +// calculateFailedIndexes returns the list of failed indexes in compressed +// format (intervals). The list includes indexes already present in +// .status.failedIndexes and indexes that failed since the last sync. +func calculateFailedIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Pod) *orderedIntervals { + var prevIntervals orderedIntervals + if job.Status.FailedIndexes != nil { + prevIntervals = parseIndexesFromString(logger, *job.Status.FailedIndexes, int(*job.Spec.Completions)) + } + newFailed := sets.New[int]() + for _, p := range pods { + ix := getCompletionIndex(p.Annotations) + // Failed Pod with valid index and has a finalizer (meaning that it is not counted yet). + if ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && hasJobTrackingFinalizer(p) && isIndexFailed(logger, job, p) { + newFailed.Insert(ix) + } + } + // List returns the items of the set in order. + result := prevIntervals.withOrderedIndexes(sets.List(newFailed)) + return &result +} + +func isIndexFailed(logger klog.Logger, job *batch.Job, pod *v1.Pod) bool { + isPodFailedCounted := false + if isPodFailed(pod, job) { + if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { + _, countFailed, action := matchPodFailurePolicy(job.Spec.PodFailurePolicy, pod) + if action != nil && *action == batch.PodFailurePolicyActionFailIndex { + return true + } + isPodFailedCounted = countFailed + } else { + isPodFailedCounted = true + } + } + return isPodFailedCounted && getIndexFailureCount(logger, pod) >= *job.Spec.BackoffLimitPerIndex +} + // withOrderedIndexes returns a new list of ordered intervals that contains // the newIndexes, provided in increasing order. func (oi orderedIntervals) withOrderedIndexes(newIndexes []int) orderedIntervals { + newIndexIntervals := make(orderedIntervals, len(newIndexes)) + for i, newIndex := range newIndexes { + newIndexIntervals[i] = interval{newIndex, newIndex} + } + return oi.merge(newIndexIntervals) +} + +// with returns a new list of ordered intervals that contains the newOrderedIntervals. +func (oi orderedIntervals) merge(newOi orderedIntervals) orderedIntervals { var result orderedIntervals i := 0 j := 0 @@ -84,12 +135,12 @@ func (oi orderedIntervals) withOrderedIndexes(newIndexes []int) orderedIntervals lastInterval.Last = thisInterval.Last } } - for i < len(oi) && j < len(newIndexes) { - if oi[i].First < newIndexes[j] { + for i < len(oi) && j < len(newOi) { + if oi[i].First < newOi[j].First { appendOrMergeWithLastInterval(oi[i]) i++ } else { - appendOrMergeWithLastInterval(interval{newIndexes[j], newIndexes[j]}) + appendOrMergeWithLastInterval(newOi[j]) j++ } } @@ -97,8 +148,8 @@ func (oi orderedIntervals) withOrderedIndexes(newIndexes []int) orderedIntervals appendOrMergeWithLastInterval(oi[i]) i++ } - for j < len(newIndexes) { - appendOrMergeWithLastInterval(interval{newIndexes[j], newIndexes[j]}) + for j < len(newOi) { + appendOrMergeWithLastInterval(newOi[j]) j++ } return result @@ -150,19 +201,19 @@ func (oi orderedIntervals) has(ix int) bool { return oi[hi].First <= ix } -func succeededIndexesFromString(logger klog.Logger, completedIndexes string, completions int) orderedIntervals { - if completedIndexes == "" { +func parseIndexesFromString(logger klog.Logger, indexesStr string, completions int) orderedIntervals { + if indexesStr == "" { return nil } var result orderedIntervals var lastInterval *interval - for _, intervalStr := range strings.Split(completedIndexes, ",") { + for _, intervalStr := range strings.Split(indexesStr, ",") { limitsStr := strings.Split(intervalStr, "-") var inter interval var err error inter.First, err = strconv.Atoi(limitsStr[0]) if err != nil { - logger.Info("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err) + logger.Info("Corrupted indexes interval, ignoring", "interval", intervalStr, "err", err) continue } if inter.First >= completions { @@ -171,7 +222,7 @@ func succeededIndexesFromString(logger klog.Logger, completedIndexes string, com if len(limitsStr) > 1 { inter.Last, err = strconv.Atoi(limitsStr[1]) if err != nil { - logger.Info("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err) + logger.Info("Corrupted indexes interval, ignoring", "interval", intervalStr, "err", err) continue } if inter.Last >= completions { @@ -191,20 +242,17 @@ func succeededIndexesFromString(logger klog.Logger, completedIndexes string, com } // firstPendingIndexes returns `count` indexes less than `completions` that are -// not covered by `activePods` or `succeededIndexes`. +// not covered by `activePods`, `succeededIndexes` or `failedIndexes`. func firstPendingIndexes(jobCtx *syncJobCtx, count, completions int) []int { if count == 0 { return nil } - active := sets.New[int]() - for _, p := range jobCtx.activePods { - ix := getCompletionIndex(p.Annotations) - if ix != unknownCompletionIndex { - active.Insert(ix) - } - } + active := getIndexes(jobCtx.activePods) result := make([]int, 0, count) nonPending := jobCtx.succeededIndexes.withOrderedIndexes(sets.List(active)) + if jobCtx.failedIndexes != nil { + nonPending = nonPending.merge(*jobCtx.failedIndexes) + } // The following algorithm is bounded by len(nonPending) and count. candidate := 0 for _, sInterval := range nonPending { @@ -221,6 +269,18 @@ func firstPendingIndexes(jobCtx *syncJobCtx, count, completions int) []int { return result } +// Returns the list of indexes corresponding to the set of pods +func getIndexes(pods []*v1.Pod) sets.Set[int] { + result := sets.New[int]() + for _, p := range pods { + ix := getCompletionIndex(p.Annotations) + if ix != unknownCompletionIndex { + result.Insert(ix) + } + } + return result +} + // appendDuplicatedIndexPodsForRemoval scans active `pods` for duplicated // completion indexes. For each index, it selects n-1 pods for removal, where n // is the number of repetitions. The pods to be removed are appended to `rm`, @@ -248,6 +308,69 @@ func appendDuplicatedIndexPodsForRemoval(rm, left, pods []*v1.Pod, completions i return appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:countLooped], lastIndex) } +// getPodsWithDelayedDeletionPerIndex returns the pod which removal is delayed +// in order to await for recreation. This map is used when BackoffLimitPerIndex +// is enabled to delay pod finalizer removal, and thus pod deletion, until the +// replacement pod is created. The pod deletion is delayed so that the +// replacement pod can have the batch.kubernetes.io/job-index-failure-count +// annotation set properly keeping track of the number of failed pods within +// the index. +func getPodsWithDelayedDeletionPerIndex(logger klog.Logger, jobCtx *syncJobCtx) map[int]*v1.Pod { + // the failed pods corresponding to currently active indexes can be safely + // deleted as the failure count annotation is present in the currently + // active pods. + activeIndexes := getIndexes(jobCtx.activePods) + + podsWithDelayedDeletionPerIndex := make(map[int]*v1.Pod) + getValidPodsWithFilter(jobCtx, nil, func(p *v1.Pod) bool { + if isPodFailed(p, jobCtx.job) { + if ix := getCompletionIndex(p.Annotations); ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions) { + if jobCtx.succeededIndexes.has(ix) || jobCtx.failedIndexes.has(ix) || activeIndexes.Has(ix) { + return false + } + if lastPodWithDelayedDeletion, ok := podsWithDelayedDeletionPerIndex[ix]; ok { + if getIndexAbsoluteFailureCount(logger, lastPodWithDelayedDeletion) <= getIndexAbsoluteFailureCount(logger, p) && !getFinishedTime(p).Before(getFinishedTime(lastPodWithDelayedDeletion)) { + podsWithDelayedDeletionPerIndex[ix] = p + } + } else { + podsWithDelayedDeletionPerIndex[ix] = p + } + } + } + return false + }) + return podsWithDelayedDeletionPerIndex +} + +func addIndexFailureCountAnnotation(logger klog.Logger, template *v1.PodTemplateSpec, job *batch.Job, podBeingReplaced *v1.Pod) { + indexFailureCount, indexIgnoredFailureCount := getNewIndexFailureCounts(logger, job, podBeingReplaced) + template.Annotations[batch.JobIndexFailureCountAnnotation] = strconv.Itoa(int(indexFailureCount)) + if indexIgnoredFailureCount > 0 { + template.Annotations[batch.JobIndexIgnoredFailureCountAnnotation] = strconv.Itoa(int(indexIgnoredFailureCount)) + } +} + +// getNewIndexFailureCount returns the value of the index-failure-count +// annotation for the new pod being created +func getNewIndexFailureCounts(logger klog.Logger, job *batch.Job, podBeingReplaced *v1.Pod) (int32, int32) { + if podBeingReplaced != nil { + indexFailureCount := parseIndexFailureCountAnnotation(logger, podBeingReplaced) + indexIgnoredFailureCount := parseIndexFailureIgnoreCountAnnotation(logger, podBeingReplaced) + if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { + _, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, podBeingReplaced) + if countFailed { + indexFailureCount++ + } else { + indexIgnoredFailureCount++ + } + } else { + indexFailureCount++ + } + return indexFailureCount, indexIgnoredFailureCount + } + return 0, 0 +} + func appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods []*v1.Pod, ix int) ([]*v1.Pod, []*v1.Pod) { if ix == unknownCompletionIndex { rm = append(rm, pods...) @@ -281,6 +404,49 @@ func getCompletionIndex(annotations map[string]string) int { return i } +// getIndexFailureCount returns the value of the batch.kubernetes.io/job-index-failure-count +// annotation as int32. It fallbacks to 0 when: +// - there is no annotation - for example the pod was created when the BackoffLimitPerIndex +// feature was temporarily disabled, or the annotation was manually removed by the user, +// - the value of the annotation isn't parsable as int - for example because +// it was set by a malicious user, +// - the value of the annotation is negative or greater by int32 - for example +// because it was set by a malicious user. +func getIndexFailureCount(logger klog.Logger, pod *v1.Pod) int32 { + return parseIndexFailureCountAnnotation(logger, pod) +} + +func getIndexAbsoluteFailureCount(logger klog.Logger, pod *v1.Pod) int32 { + return parseIndexFailureCountAnnotation(logger, pod) + parseIndexFailureIgnoreCountAnnotation(logger, pod) +} + +func parseIndexFailureCountAnnotation(logger klog.Logger, pod *v1.Pod) int32 { + if value, ok := pod.Annotations[batch.JobIndexFailureCountAnnotation]; ok { + return parseInt32(logger, value) + } + logger.V(3).Info("There is no expected annotation", "annotationKey", batch.JobIndexFailureCountAnnotation, "pod", klog.KObj(pod), "podUID", pod.UID) + return 0 +} + +func parseIndexFailureIgnoreCountAnnotation(logger klog.Logger, pod *v1.Pod) int32 { + if value, ok := pod.Annotations[batch.JobIndexIgnoredFailureCountAnnotation]; ok { + return parseInt32(logger, value) + } + return 0 +} + +func parseInt32(logger klog.Logger, vStr string) int32 { + if vInt, err := strconv.Atoi(vStr); err != nil { + logger.Error(err, "Failed to parse the value", "value", vStr) + return 0 + } else if vInt < 0 || vInt > math.MaxInt32 { + logger.Info("The value is invalid", "value", vInt) + return 0 + } else { + return int32(vInt) + } +} + func addCompletionIndexEnvVariables(template *v1.PodTemplateSpec) { for i := range template.Spec.InitContainers { addCompletionIndexEnvVariable(&template.Spec.InitContainers[i]) diff --git a/pkg/controller/job/indexed_job_utils_test.go b/pkg/controller/job/indexed_job_utils_test.go index 1280bc3ab56..9d6ccc901c9 100644 --- a/pkg/controller/job/indexed_job_utils_test.go +++ b/pkg/controller/job/indexed_job_utils_test.go @@ -17,12 +17,20 @@ limitations under the License. package job import ( + "math" + "strconv" "testing" + "time" "github.com/google/go-cmp/cmp" batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2/ktesting" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/features" "k8s.io/utils/pointer" ) @@ -219,6 +227,427 @@ func TestCalculateSucceededIndexes(t *testing.T) { } } +func TestIsIndexFailed(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + cases := map[string]struct { + enableJobPodFailurePolicy bool + job batch.Job + pod *v1.Pod + wantResult bool + }{ + "failed pod exceeding backoffLimitPerIndex, when backoffLimitPerIndex=0": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(0), + }, + }, + pod: buildPod().indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod, + wantResult: true, + }, + "failed pod exceeding backoffLimitPerIndex, when backoffLimitPerIndex=1": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + pod: buildPod().indexFailureCount("1").phase(v1.PodFailed).index("1").trackingFinalizer().Pod, + wantResult: true, + }, + "matching FailIndex pod failure policy; JobPodFailurePolicy enabled": { + enableJobPodFailurePolicy: true, + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(1), + PodFailurePolicy: &batch.PodFailurePolicy{ + Rules: []batch.PodFailurePolicyRule{ + { + Action: batch.PodFailurePolicyActionFailIndex, + OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ + Operator: batch.PodFailurePolicyOnExitCodesOpIn, + Values: []int32{3}, + }, + }, + }, + }, + }, + }, + pod: buildPod().indexFailureCount("0").status(v1.PodStatus{ + Phase: v1.PodFailed, + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + ExitCode: 3, + }, + }, + }, + }, + }).index("0").trackingFinalizer().Pod, + wantResult: true, + }, + "matching FailIndex pod failure policy; JobPodFailurePolicy disabled": { + enableJobPodFailurePolicy: false, + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(1), + PodFailurePolicy: &batch.PodFailurePolicy{ + Rules: []batch.PodFailurePolicyRule{ + { + Action: batch.PodFailurePolicyActionFailIndex, + OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ + Operator: batch.PodFailurePolicyOnExitCodesOpIn, + Values: []int32{3}, + }, + }, + }, + }, + }, + }, + pod: buildPod().indexFailureCount("0").status(v1.PodStatus{ + Phase: v1.PodFailed, + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + ExitCode: 3, + }, + }, + }, + }, + }).index("0").trackingFinalizer().Pod, + wantResult: false, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() + gotResult := isIndexFailed(logger, &tc.job, tc.pod) + if diff := cmp.Diff(tc.wantResult, gotResult); diff != "" { + t.Errorf("Unexpected result (-want,+got):\n%s", diff) + } + }) + } +} + +func TestCalculateFailedIndexes(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + cases := map[string]struct { + enableJobPodFailurePolicy bool + job batch.Job + pods []*v1.Pod + wantPrevFailedIndexes orderedIntervals + wantFailedIndexes orderedIntervals + }{ + "one new index failed": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + pods: []*v1.Pod{ + buildPod().indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod, + buildPod().indexFailureCount("1").phase(v1.PodFailed).index("1").trackingFinalizer().Pod, + }, + wantFailedIndexes: []interval{{1, 1}}, + }, + "pod without finalizer is ignored": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(0), + }, + }, + pods: []*v1.Pod{ + buildPod().indexFailureCount("0").phase(v1.PodFailed).index("0").Pod, + }, + wantFailedIndexes: nil, + }, + "pod outside completions is ignored": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(0), + }, + }, + pods: []*v1.Pod{ + buildPod().indexFailureCount("0").phase(v1.PodFailed).index("3").Pod, + }, + wantFailedIndexes: nil, + }, + "extend the failed indexes": { + job: batch.Job{ + Status: batch.JobStatus{ + FailedIndexes: pointer.String("0"), + }, + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(0), + }, + }, + pods: []*v1.Pod{ + buildPod().indexFailureCount("0").phase(v1.PodFailed).index("1").trackingFinalizer().Pod, + }, + wantFailedIndexes: []interval{{0, 1}}, + }, + "prev failed indexes empty": { + job: batch.Job{ + Status: batch.JobStatus{ + FailedIndexes: pointer.String(""), + }, + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(0), + }, + }, + pods: []*v1.Pod{ + buildPod().indexFailureCount("0").phase(v1.PodFailed).index("1").trackingFinalizer().Pod, + }, + wantFailedIndexes: []interval{{1, 1}}, + }, + "prev failed indexes outside the completions": { + job: batch.Job{ + Status: batch.JobStatus{ + FailedIndexes: pointer.String("9"), + }, + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(0), + }, + }, + pods: []*v1.Pod{ + buildPod().indexFailureCount("0").phase(v1.PodFailed).index("1").trackingFinalizer().Pod, + }, + wantFailedIndexes: []interval{{1, 1}}, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + failedIndexes := calculateFailedIndexes(logger, &tc.job, tc.pods) + if diff := cmp.Diff(&tc.wantFailedIndexes, failedIndexes); diff != "" { + t.Errorf("Unexpected failed indexes (-want,+got):\n%s", diff) + } + }) + } +} + +func TestGetPodsWithDelayedDeletionPerIndex(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + now := time.Now() + cases := map[string]struct { + enableJobPodFailurePolicy bool + job batch.Job + pods []*v1.Pod + expectedRmFinalizers sets.Set[string] + wantPodsWithDelayedDeletionPerIndex []string + }{ + "failed pods are kept corresponding to non-failed indexes are kept": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(3), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod, + buildPod().uid("b").indexFailureCount("1").phase(v1.PodFailed).index("1").trackingFinalizer().Pod, + buildPod().uid("c").indexFailureCount("0").phase(v1.PodFailed).index("2").trackingFinalizer().Pod, + }, + wantPodsWithDelayedDeletionPerIndex: []string{"a", "c"}, + }, + "failed pod without finalizer; the pod's deletion is not delayed as it already started": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(0), + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").Pod, + }, + wantPodsWithDelayedDeletionPerIndex: []string{}, + }, + "failed pod with expected finalizer removal; the pod's deletion is not delayed as it already started": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(0), + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod, + }, + expectedRmFinalizers: sets.New("a"), + wantPodsWithDelayedDeletionPerIndex: []string{}, + }, + "failed pod with index outside of completions; the pod's deletion is not delayed": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(0), + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("4").trackingFinalizer().Pod, + }, + wantPodsWithDelayedDeletionPerIndex: []string{}, + }, + "failed pod for active index; the pod's deletion is not delayed as it is already replaced": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a1").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod, + buildPod().uid("a2").indexFailureCount("1").phase(v1.PodRunning).index("0").trackingFinalizer().Pod, + }, + wantPodsWithDelayedDeletionPerIndex: []string{}, + }, + "failed pod for succeeded index; the pod's deletion is not delayed as it is already replaced": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a1").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod, + buildPod().uid("a2").indexFailureCount("1").phase(v1.PodSucceeded).index("0").trackingFinalizer().Pod, + }, + wantPodsWithDelayedDeletionPerIndex: []string{}, + }, + "multiple failed pods for index with different failure count; only the pod with highest failure count is kept": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(4), + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a1").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod, + buildPod().uid("a3").indexFailureCount("2").phase(v1.PodFailed).index("0").trackingFinalizer().Pod, + buildPod().uid("a2").indexFailureCount("1").phase(v1.PodFailed).index("0").trackingFinalizer().Pod, + }, + wantPodsWithDelayedDeletionPerIndex: []string{"a3"}, + }, + "multiple failed pods for index with different finish times; only the last failed pod is kept": { + job: batch.Job{ + Spec: batch.JobSpec{ + Completions: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(4), + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a1").indexFailureCount("1").phase(v1.PodFailed).index("0").customDeletionTimestamp(now.Add(-time.Second)).trackingFinalizer().Pod, + buildPod().uid("a3").indexFailureCount("1").phase(v1.PodFailed).index("0").customDeletionTimestamp(now).trackingFinalizer().Pod, + buildPod().uid("a2").indexFailureCount("1").phase(v1.PodFailed).index("0").customDeletionTimestamp(now.Add(-2 * time.Second)).trackingFinalizer().Pod, + }, + wantPodsWithDelayedDeletionPerIndex: []string{"a3"}, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)() + activePods := controller.FilterActivePods(logger, tc.pods) + failedIndexes := calculateFailedIndexes(logger, &tc.job, tc.pods) + _, succeededIndexes := calculateSucceededIndexes(logger, &tc.job, tc.pods) + jobCtx := &syncJobCtx{ + job: &tc.job, + pods: tc.pods, + activePods: activePods, + succeededIndexes: succeededIndexes, + failedIndexes: failedIndexes, + expectedRmFinalizers: tc.expectedRmFinalizers, + } + gotPodsWithDelayedDeletionPerIndex := getPodsWithDelayedDeletionPerIndex(logger, jobCtx) + gotPodsWithDelayedDeletionPerIndexSet := sets.New[string]() + for _, pod := range gotPodsWithDelayedDeletionPerIndex { + gotPodsWithDelayedDeletionPerIndexSet.Insert(string(pod.UID)) + } + if diff := cmp.Diff(tc.wantPodsWithDelayedDeletionPerIndex, sets.List(gotPodsWithDelayedDeletionPerIndexSet)); diff != "" { + t.Errorf("Unexpected set of pods with delayed deletion (-want,+got):\n%s", diff) + } + }) + } +} + +func TestGetNewIndexFailureCountValue(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + cases := map[string]struct { + enableJobPodFailurePolicy bool + job batch.Job + pod *v1.Pod + wantNewIndexFailureCount int32 + wantNewIndexIgnoredFailureCount int32 + }{ + "first pod created": { + job: batch.Job{}, + wantNewIndexFailureCount: 0, + }, + "failed pod being replaced with 0 index failure count": { + job: batch.Job{}, + pod: buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod, + wantNewIndexFailureCount: 1, + }, + "failed pod being replaced with >0 index failure count": { + job: batch.Job{}, + pod: buildPod().uid("a").indexFailureCount("3").phase(v1.PodFailed).index("0").trackingFinalizer().Pod, + wantNewIndexFailureCount: 4, + }, + "failed pod being replaced, matching the ignore rule; JobPodFailurePolicy enabled": { + enableJobPodFailurePolicy: true, + job: batch.Job{ + Spec: batch.JobSpec{ + PodFailurePolicy: &batch.PodFailurePolicy{ + Rules: []batch.PodFailurePolicyRule{ + { + Action: batch.PodFailurePolicyActionIgnore, + OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ + { + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + }, + }, + }, + }, + }, + }, + }, + pod: buildPod().uid("a").indexFailureCount("3").status(v1.PodStatus{ + Phase: v1.PodFailed, + Conditions: []v1.PodCondition{ + { + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + }, + }, + }).index("3").trackingFinalizer().Pod, + wantNewIndexFailureCount: 3, + wantNewIndexIgnoredFailureCount: 1, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() + gotNewIndexFailureCount, gotNewIndexIgnoredFailureCount := getNewIndexFailureCounts(logger, &tc.job, tc.pod) + if diff := cmp.Diff(tc.wantNewIndexFailureCount, gotNewIndexFailureCount); diff != "" { + t.Errorf("Unexpected set of pods with delayed deletion (-want,+got):\n%s", diff) + } + if diff := cmp.Diff(tc.wantNewIndexIgnoredFailureCount, gotNewIndexIgnoredFailureCount); diff != "" { + t.Errorf("Unexpected set of pods with delayed deletion (-want,+got):\n%s", diff) + } + }) + } +} + func TestIntervalsHaveIndex(t *testing.T) { cases := map[string]struct { intervals orderedIntervals @@ -267,6 +696,7 @@ func TestFirstPendingIndexes(t *testing.T) { completions int activePods []indexPhase succeededIndexes []interval + failedIndexes *orderedIntervals want []int }{ "cnt greater than completions": { @@ -310,12 +740,24 @@ func TestFirstPendingIndexes(t *testing.T) { completions: 20, want: []int{0, 1, 6, 7, 10}, }, + "with failed indexes": { + activePods: []indexPhase{ + {"3", v1.PodPending}, + {"9", v1.PodPending}, + }, + succeededIndexes: []interval{{1, 1}, {5, 5}, {9, 9}}, + failedIndexes: &orderedIntervals{{2, 2}, {6, 7}}, + cnt: 5, + completions: 20, + want: []int{0, 4, 8, 10, 11}, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { jobCtx := &syncJobCtx{ activePods: hollowPodsWithIndexPhase(tc.activePods), succeededIndexes: tc.succeededIndexes, + failedIndexes: tc.failedIndexes, } got := firstPendingIndexes(jobCtx, tc.cnt, tc.completions) if diff := cmp.Diff(tc.want, got); diff != "" { @@ -446,6 +888,47 @@ func TestPodGenerateNameWithIndex(t *testing.T) { } } +func TestGetIndexFailureCount(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + cases := map[string]struct { + pod *v1.Pod + wantResult int32 + }{ + "no annotation": { + pod: &v1.Pod{}, + wantResult: 0, + }, + "valid value": { + pod: buildPod().indexFailureCount("2").Pod, + wantResult: 2, + }, + "valid maxint32 value": { + pod: buildPod().indexFailureCount(strconv.Itoa(math.MaxInt32)).Pod, + wantResult: math.MaxInt32, + }, + "too large value": { + pod: buildPod().indexFailureCount(strconv.Itoa(math.MaxInt32 + 1)).Pod, + wantResult: 0, + }, + "negative value": { + pod: buildPod().indexFailureCount("-1").Pod, + wantResult: 0, + }, + "invalid int value": { + pod: buildPod().indexFailureCount("xyz").Pod, + wantResult: 0, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + gotResult := getIndexFailureCount(logger, tc.pod) + if diff := cmp.Equal(tc.wantResult, gotResult); !diff { + t.Errorf("Unexpected result. want: %d, got: %d", tc.wantResult, gotResult) + } + }) + } +} + func hollowPodsWithIndexPhase(descs []indexPhase) []*v1.Pod { pods := make([]*v1.Pod, 0, len(descs)) for _, desc := range descs { diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index c7379131ffe..078ef2a7e6c 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -132,16 +132,18 @@ type Controller struct { } type syncJobCtx struct { - job *batch.Job - pods []*v1.Pod - finishedCondition *batch.JobCondition - activePods []*v1.Pod - succeeded int32 - prevSucceededIndexes orderedIntervals - succeededIndexes orderedIntervals - newBackoffRecord backoffRecord - expectedRmFinalizers sets.Set[string] - uncounted *uncountedTerminatedPods + job *batch.Job + pods []*v1.Pod + finishedCondition *batch.JobCondition + activePods []*v1.Pod + succeeded int32 + prevSucceededIndexes orderedIntervals + succeededIndexes orderedIntervals + failedIndexes *orderedIntervals + newBackoffRecord backoffRecord + expectedRmFinalizers sets.Set[string] + uncounted *uncountedTerminatedPods + podsWithDelayedDeletionPerIndex map[int]*v1.Pod } // NewController creates a new Job controller that keeps the relevant pods @@ -835,6 +837,17 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if isIndexedJob(&job) { jobCtx.prevSucceededIndexes, jobCtx.succeededIndexes = calculateSucceededIndexes(logger, &job, pods) jobCtx.succeeded = int32(jobCtx.succeededIndexes.total()) + if hasBackoffLimitPerIndex(&job) { + jobCtx.failedIndexes = calculateFailedIndexes(logger, &job, pods) + if jobCtx.finishedCondition == nil { + if job.Spec.MaxFailedIndexes != nil && jobCtx.failedIndexes.total() > int(*job.Spec.MaxFailedIndexes) { + jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "MaxFailedIndexesExceeded", "Job has exceeded the specified maximal number of failed indexes", jm.clock.Now()) + } else if jobCtx.failedIndexes.total() > 0 && jobCtx.failedIndexes.total()+jobCtx.succeededIndexes.total() >= int(*job.Spec.Completions) { + jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "FailedIndexes", "Job has failed indexes", jm.clock.Now()) + } + } + jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx) + } } suspendCondChanged := false // Remove active pods if Job failed. @@ -1017,9 +1030,10 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job continue } considerPodFailed := isPodFailed(pod, jobCtx.job) - if podutil.IsPodTerminal(pod) || considerPodFailed || jobCtx.finishedCondition != nil || jobCtx.job.DeletionTimestamp != nil { - podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod) + if !canRemoveFinalizer(logger, jobCtx, pod, considerPodFailed) { + continue } + podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod) if pod.Status.Phase == v1.PodSucceeded && !jobCtx.uncounted.failed.Has(string(pod.UID)) { if isIndexed { // The completion index is enough to avoid recounting succeeded pods. @@ -1073,6 +1087,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job } jobCtx.job.Status.Succeeded = int32(jobCtx.succeededIndexes.total()) jobCtx.job.Status.CompletedIndexes = succeededIndexesStr + var failedIndexesStr *string + if jobCtx.failedIndexes != nil { + failedIndexesStr = pointer.String(jobCtx.failedIndexes.String()) + } + if !pointer.StringEqual(jobCtx.job.Status.FailedIndexes, failedIndexesStr) { + jobCtx.job.Status.FailedIndexes = failedIndexesStr + needsFlush = true + } } if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { if jobCtx.finishedCondition != nil && jobCtx.finishedCondition.Type == batch.JobFailureTarget { @@ -1106,6 +1128,32 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job return nil } +// canRemoveFinalizer determines if the pod's finalizer can be safely removed. +// The finalizer can be removed when: +// - the entire Job is terminating; or +// - the pod's index is succeeded; or +// - the Pod is considered failed, unless it's removal is delayed for the +// purpose of transferring the JobIndexFailureCount annotations to the +// replacement pod. the entire Job is terminating the finalizer can be +// removed unconditionally. +func canRemoveFinalizer(logger klog.Logger, jobCtx *syncJobCtx, pod *v1.Pod, considerPodFailed bool) bool { + if jobCtx.job.DeletionTimestamp != nil || jobCtx.finishedCondition != nil || pod.Status.Phase == v1.PodSucceeded { + return true + } + if !considerPodFailed { + return false + } + if hasBackoffLimitPerIndex(jobCtx.job) { + if index := getCompletionIndex(pod.Annotations); index != unknownCompletionIndex { + if p, ok := jobCtx.podsWithDelayedDeletionPerIndex[index]; ok && p.UID == pod.UID { + logger.V(3).Info("Delaying pod finalizer removal to await for pod recreation within the index", "pod", klog.KObj(pod)) + return false + } + } + } + return true +} + // flushUncountedAndRemoveFinalizers does: // 1. flush the Job status that might include new uncounted Pod UIDs. Also flush the interim FailureTarget condition // if present. @@ -1443,7 +1491,11 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn } if active < wantActive { - remainingTime := jobCtx.newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff) + var remainingTime time.Duration + if !hasBackoffLimitPerIndex(job) { + // we compute the global remaining time for pod creation when backoffLimitPerIndex is not used + remainingTime = jobCtx.newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff) + } if remainingTime > 0 { jm.enqueueSyncJobWithDelay(logger, job, remainingTime) return 0, metrics.JobSyncActionPodsCreated, nil @@ -1456,6 +1508,13 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn var indexesToAdd []int if isIndexedJob(job) { indexesToAdd = firstPendingIndexes(jobCtx, int(diff), int(*job.Spec.Completions)) + if hasBackoffLimitPerIndex(job) { + indexesToAdd, remainingTime = jm.getPodCreationInfoForIndependentIndexes(logger, indexesToAdd, jobCtx.podsWithDelayedDeletionPerIndex) + if remainingTime > 0 { + jm.enqueueSyncJobWithDelay(logger, job, remainingTime) + return 0, metrics.JobSyncActionPodsCreated, nil + } + } diff = int32(len(indexesToAdd)) } @@ -1502,6 +1561,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn } template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex) generateName = podGenerateNameWithIndex(job.Name, completionIndex) + if hasBackoffLimitPerIndex(job) { + addIndexFailureCountAnnotation(logger, template, job, jobCtx.podsWithDelayedDeletionPerIndex[completionIndex]) + } } defer wait.Done() err := jm.podControl.CreatePodsWithGenerateName(ctx, job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName) @@ -1544,6 +1606,26 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn return active, metrics.JobSyncActionTracking, nil } +// getPodCreationInfoForIndependentIndexes returns a sub-list of all indexes +// to create that contains those which can be already created. In case no indexes +// are ready to create pods, it returns the lowest remaining time to create pods +// out of all indexes. +func (jm *Controller) getPodCreationInfoForIndependentIndexes(logger klog.Logger, indexesToAdd []int, podsWithDelayedDeletionPerIndex map[int]*v1.Pod) ([]int, time.Duration) { + var indexesToAddNow []int + var minRemainingTimePerIndex *time.Duration + for _, indexToAdd := range indexesToAdd { + if remainingTimePerIndex := getRemainingTimePerIndex(logger, jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff, podsWithDelayedDeletionPerIndex[indexToAdd]); remainingTimePerIndex == 0 { + indexesToAddNow = append(indexesToAddNow, indexToAdd) + } else if minRemainingTimePerIndex == nil || remainingTimePerIndex < *minRemainingTimePerIndex { + minRemainingTimePerIndex = &remainingTimePerIndex + } + } + if len(indexesToAddNow) > 0 { + return indexesToAddNow, 0 + } + return indexesToAddNow, pointer.DurationDeref(minRemainingTimePerIndex, 0) +} + // activePodsForRemoval returns Pods that should be removed because there // are too many pods running or, if this is an indexed job, there are repeated // indexes or invalid indexes or some pods don't have indexes. @@ -1735,7 +1817,7 @@ func recordJobPodFinished(logger klog.Logger, job *batch.Job, oldCounters batch. // now out of range (i.e. index >= spec.Completions). if isIndexedJob(job) { if job.Status.CompletedIndexes != oldCounters.CompletedIndexes { - diff = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)).total() - succeededIndexesFromString(logger, oldCounters.CompletedIndexes, int(*job.Spec.Completions)).total() + diff = parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)).total() - parseIndexesFromString(logger, oldCounters.CompletedIndexes, int(*job.Spec.Completions)).total() } } else { diff = int(job.Status.Succeeded) - int(oldCounters.Succeeded) diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 80e5fadadae..e533300e3de 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "math" "sort" "strconv" "testing" @@ -1128,6 +1129,9 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { wantStatusUpdates []batch.JobStatus wantSucceededPodsMetric int wantFailedPodsMetric int + + // features + enableJobBackoffLimitPerIndex bool }{ "no updates": {}, "new active": { @@ -1649,9 +1653,91 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, wantFailedPodsMetric: 2, }, + "indexed job with a failed pod with delayed finalizer removal; the pod is not counted": { + enableJobBackoffLimitPerIndex: true, + job: batch.Job{ + Spec: batch.JobSpec{ + CompletionMode: &indexedCompletion, + Completions: pointer.Int32(6), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().index("1").Pod, + }, + wantStatusUpdates: []batch.JobStatus{ + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + FailedIndexes: pointer.String(""), + }, + }, + }, + "indexed job with a failed pod which is recreated by a running pod; the pod is counted": { + enableJobBackoffLimitPerIndex: true, + job: batch.Job{ + Spec: batch.JobSpec{ + CompletionMode: &indexedCompletion, + Completions: pointer.Int32(6), + BackoffLimitPerIndex: pointer.Int32(1), + }, + Status: batch.JobStatus{ + Active: 1, + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a1").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().index("1").Pod, + buildPod().uid("a2").phase(v1.PodRunning).indexFailureCount("1").trackingFinalizer().index("1").Pod, + }, + wantRmFinalizers: 1, + wantStatusUpdates: []batch.JobStatus{ + { + Active: 1, + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Failed: []types.UID{"a1"}, + }, + FailedIndexes: pointer.String(""), + }, + { + Active: 1, + Failed: 1, + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + FailedIndexes: pointer.String(""), + }, + }, + wantFailedPodsMetric: 1, + }, + "indexed job with a failed pod for a failed index; the pod is counted": { + enableJobBackoffLimitPerIndex: true, + job: batch.Job{ + Spec: batch.JobSpec{ + CompletionMode: &indexedCompletion, + Completions: pointer.Int32(6), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().index("1").Pod, + }, + wantRmFinalizers: 1, + wantStatusUpdates: []batch.JobStatus{ + { + FailedIndexes: pointer.String("1"), + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Failed: []types.UID{"a"}, + }, + }, + { + Failed: 1, + FailedIndexes: pointer.String("1"), + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + }, + }, + wantFailedPodsMetric: 1, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)() clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, _ := newControllerFromClient(ctx, clientSet, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{Err: tc.podControlErr} @@ -1666,20 +1752,22 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { if job.Status.UncountedTerminatedPods == nil { job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} } - uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) - var succeededIndexes orderedIntervals - if isIndexedJob(job) { - succeededIndexes = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) - } jobCtx := &syncJobCtx{ job: job, pods: tc.pods, - succeededIndexes: succeededIndexes, - uncounted: uncounted, + uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods), expectedRmFinalizers: tc.expectedRmFinalizers, finishedCondition: tc.finishedCond, - newBackoffRecord: backoffRecord{}, } + if isIndexedJob(job) { + jobCtx.succeededIndexes = parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) + if tc.enableJobBackoffLimitPerIndex && job.Spec.BackoffLimitPerIndex != nil { + jobCtx.failedIndexes = calculateFailedIndexes(logger, job, tc.pods) + jobCtx.activePods = controller.FilterActivePods(logger, tc.pods) + jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx) + } + } + err := manager.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, tc.needsFlush) if !errors.Is(err, tc.wantErr) { t.Errorf("Got error %v, want %v", err, tc.wantErr) @@ -3123,6 +3211,484 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { } } +func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + now := time.Now() + validObjectMeta := metav1.ObjectMeta{ + Name: "foobar", + UID: uuid.NewUUID(), + Namespace: metav1.NamespaceDefault, + } + validSelector := &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + } + validTemplate := v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + {Image: "foo/bar"}, + }, + }, + } + + testCases := map[string]struct { + enableJobBackoffLimitPerIndex bool + enableJobPodFailurePolicy bool + job batch.Job + pods []v1.Pod + wantStatus batch.JobStatus + }{ + "successful job after a single failure within index": { + enableJobBackoffLimitPerIndex: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: pointer.Int32(2), + Completions: pointer.Int32(2), + BackoffLimit: pointer.Int32(math.MaxInt32), + CompletionMode: completionModePtr(batch.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a1").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, + *buildPod().uid("a2").index("0").phase(v1.PodSucceeded).indexFailureCount("1").trackingFinalizer().Pod, + *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 1, + Succeeded: 2, + CompletedIndexes: "0,1", + FailedIndexes: pointer.String(""), + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobComplete, + Status: v1.ConditionTrue, + }, + }, + }, + }, + "single failed pod, not counted as the replacement pod creation is delayed": { + enableJobBackoffLimitPerIndex: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: pointer.Int32(2), + Completions: pointer.Int32(2), + BackoffLimit: pointer.Int32(math.MaxInt32), + CompletionMode: completionModePtr(batch.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Active: 2, + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + FailedIndexes: pointer.String(""), + }, + }, + "single failed pod replaced already": { + enableJobBackoffLimitPerIndex: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: pointer.Int32(2), + Completions: pointer.Int32(2), + BackoffLimit: pointer.Int32(math.MaxInt32), + CompletionMode: completionModePtr(batch.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, + *buildPod().uid("b").index("0").phase(v1.PodPending).indexFailureCount("1").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Active: 2, + Failed: 1, + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + FailedIndexes: pointer.String(""), + }, + }, + "single failed index due to exceeding the backoff limit per index, the job continues": { + enableJobBackoffLimitPerIndex: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: pointer.Int32(2), + Completions: pointer.Int32(2), + BackoffLimit: pointer.Int32(math.MaxInt32), + CompletionMode: completionModePtr(batch.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Active: 1, + Failed: 1, + FailedIndexes: pointer.String("0"), + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + }, + }, + "single failed index due to FailIndex action, the job continues": { + enableJobBackoffLimitPerIndex: true, + enableJobPodFailurePolicy: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: pointer.Int32(2), + Completions: pointer.Int32(2), + BackoffLimit: pointer.Int32(math.MaxInt32), + CompletionMode: completionModePtr(batch.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + PodFailurePolicy: &batch.PodFailurePolicy{ + Rules: []batch.PodFailurePolicyRule{ + { + Action: batch.PodFailurePolicyActionFailIndex, + OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ + Operator: batch.PodFailurePolicyOnExitCodesOpIn, + Values: []int32{3}, + }, + }, + }, + }, + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a").index("0").status(v1.PodStatus{ + Phase: v1.PodFailed, + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + ExitCode: 3, + }, + }, + }, + }, + }).indexFailureCount("0").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Active: 1, + Failed: 1, + FailedIndexes: pointer.String("0"), + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + }, + }, + "job failed index due to FailJob action": { + enableJobBackoffLimitPerIndex: true, + enableJobPodFailurePolicy: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: pointer.Int32(2), + Completions: pointer.Int32(2), + BackoffLimit: pointer.Int32(6), + CompletionMode: completionModePtr(batch.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + PodFailurePolicy: &batch.PodFailurePolicy{ + Rules: []batch.PodFailurePolicyRule{ + { + Action: batch.PodFailurePolicyActionFailJob, + OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ + Operator: batch.PodFailurePolicyOnExitCodesOpIn, + Values: []int32{3}, + }, + }, + }, + }, + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a").index("0").status(v1.PodStatus{ + Phase: v1.PodFailed, + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "x", + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + ExitCode: 3, + }, + }, + }, + }, + }).indexFailureCount("0").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Active: 0, + Failed: 1, + FailedIndexes: pointer.String(""), + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: v1.ConditionTrue, + Reason: "PodFailurePolicy", + Message: "Container x for pod default/mypod-0 failed with exit code 3 matching FailJob rule at index 0", + }, + { + Type: batch.JobFailed, + Status: v1.ConditionTrue, + Reason: "PodFailurePolicy", + Message: "Container x for pod default/mypod-0 failed with exit code 3 matching FailJob rule at index 0", + }, + }, + }, + }, + "job pod failure ignored due to matching Ignore action": { + enableJobBackoffLimitPerIndex: true, + enableJobPodFailurePolicy: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: pointer.Int32(2), + Completions: pointer.Int32(2), + BackoffLimit: pointer.Int32(6), + CompletionMode: completionModePtr(batch.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + PodFailurePolicy: &batch.PodFailurePolicy{ + Rules: []batch.PodFailurePolicyRule{ + { + Action: batch.PodFailurePolicyActionIgnore, + OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ + Operator: batch.PodFailurePolicyOnExitCodesOpIn, + Values: []int32{3}, + }, + }, + }, + }, + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a").index("0").status(v1.PodStatus{ + Phase: v1.PodFailed, + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "x", + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + ExitCode: 3, + }, + }, + }, + }, + }).indexFailureCount("0").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Active: 2, + Failed: 0, + FailedIndexes: pointer.String(""), + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + }, + }, + "job failed due to exceeding backoffLimit before backoffLimitPerIndex": { + enableJobBackoffLimitPerIndex: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: pointer.Int32(2), + Completions: pointer.Int32(2), + BackoffLimit: pointer.Int32(1), + CompletionMode: completionModePtr(batch.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, + *buildPod().uid("b").index("1").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 2, + Succeeded: 0, + FailedIndexes: pointer.String(""), + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailed, + Status: v1.ConditionTrue, + Reason: "BackoffLimitExceeded", + Message: "Job has reached the specified backoff limit", + }, + }, + }, + }, + "job failed due to failed indexes": { + enableJobBackoffLimitPerIndex: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: pointer.Int32(2), + Completions: pointer.Int32(2), + BackoffLimit: pointer.Int32(math.MaxInt32), + CompletionMode: completionModePtr(batch.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, + *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 1, + Succeeded: 1, + FailedIndexes: pointer.String("0"), + CompletedIndexes: "1", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailed, + Status: v1.ConditionTrue, + Reason: "FailedIndexes", + Message: "Job has failed indexes", + }, + }, + }, + }, + "job failed due to exceeding max failed indexes": { + enableJobBackoffLimitPerIndex: true, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: pointer.Int32(4), + Completions: pointer.Int32(4), + BackoffLimit: pointer.Int32(math.MaxInt32), + CompletionMode: completionModePtr(batch.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + MaxFailedIndexes: pointer.Int32(1), + }, + }, + pods: []v1.Pod{ + *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, + *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod, + *buildPod().uid("c").index("2").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod, + *buildPod().uid("d").index("3").phase(v1.PodRunning).indexFailureCount("0").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Failed: 3, + Succeeded: 1, + FailedIndexes: pointer.String("0,2"), + CompletedIndexes: "1", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailed, + Status: v1.ConditionTrue, + Reason: "MaxFailedIndexesExceeded", + Message: "Job has exceeded the specified maximal number of failed indexes", + }, + }, + }, + }, + "job with finished indexes; failedIndexes are cleaned when JobBackoffLimitPerIndex disabled": { + enableJobBackoffLimitPerIndex: false, + job: batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: pointer.Int32(3), + Completions: pointer.Int32(3), + BackoffLimit: pointer.Int32(math.MaxInt32), + CompletionMode: completionModePtr(batch.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + }, + Status: batch.JobStatus{ + FailedIndexes: pointer.String("0"), + CompletedIndexes: "1", + }, + }, + pods: []v1.Pod{ + *buildPod().uid("c").index("2").phase(v1.PodPending).indexFailureCount("1").trackingFinalizer().Pod, + }, + wantStatus: batch.JobStatus{ + Active: 2, + Succeeded: 1, + CompletedIndexes: "1", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + fakeClock := clocktesting.NewFakeClock(now) + manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady + job := &tc.job + + actual := job + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { + actual = job + return job, nil + } + sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) + for i, pod := range tc.pods { + pod := pod + pb := podBuilder{Pod: &pod}.name(fmt.Sprintf("mypod-%d", i)).job(job) + if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion { + pb.index(fmt.Sprintf("%v", getCompletionIndex(pod.Annotations))) + } + pb = pb.trackingFinalizer() + sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod) + } + + manager.syncJob(context.TODO(), testutil.GetKey(job, t)) + + // validate relevant fields of the status + if diff := cmp.Diff(tc.wantStatus, actual.Status, + cmpopts.IgnoreFields(batch.JobStatus{}, "StartTime", "CompletionTime", "Ready"), + cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" { + t.Errorf("unexpected job status. Diff: %s\n", diff) + } + }) + } +} + func TestSyncJobUpdateRequeue(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) @@ -3217,6 +3783,69 @@ func TestUpdateJobRequeue(t *testing.T) { } } +func TestGetPodCreationInfoForIndependentIndexes(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + now := time.Now() + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + cases := map[string]struct { + indexesToAdd []int + podsWithDelayedDeletionPerIndex map[int]*v1.Pod + wantIndexesToAdd []int + wantRemainingTime time.Duration + }{ + "simple index creation": { + indexesToAdd: []int{1, 3}, + wantIndexesToAdd: []int{1, 3}, + }, + "subset of indexes can be recreated now": { + indexesToAdd: []int{1, 3}, + podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{ + 1: buildPod().indexFailureCount("0").index("1").customDeletionTimestamp(now).Pod, + }, + wantIndexesToAdd: []int{3}, + }, + "subset of indexes can be recreated now as the pods failed long time ago": { + indexesToAdd: []int{1, 3}, + podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{ + 1: buildPod().indexFailureCount("0").customDeletionTimestamp(now).Pod, + 3: buildPod().indexFailureCount("0").customDeletionTimestamp(now.Add(-DefaultJobPodFailureBackOff)).Pod, + }, + wantIndexesToAdd: []int{3}, + }, + "no indexes can be recreated now, need to wait default pod failure backoff": { + indexesToAdd: []int{1, 2, 3}, + podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{ + 1: buildPod().indexFailureCount("1").customDeletionTimestamp(now).Pod, + 2: buildPod().indexFailureCount("0").customDeletionTimestamp(now).Pod, + 3: buildPod().indexFailureCount("2").customDeletionTimestamp(now).Pod, + }, + wantRemainingTime: DefaultJobPodFailureBackOff, + }, + "no indexes can be recreated now, need to wait but 1s already passed": { + indexesToAdd: []int{1, 2, 3}, + podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{ + 1: buildPod().indexFailureCount("1").customDeletionTimestamp(now.Add(-time.Second)).Pod, + 2: buildPod().indexFailureCount("0").customDeletionTimestamp(now.Add(-time.Second)).Pod, + 3: buildPod().indexFailureCount("2").customDeletionTimestamp(now.Add(-time.Second)).Pod, + }, + wantRemainingTime: DefaultJobPodFailureBackOff - time.Second, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + fakeClock := clocktesting.NewFakeClock(now) + manager, _ := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) + gotIndexesToAdd, gotRemainingTime := manager.getPodCreationInfoForIndependentIndexes(logger, tc.indexesToAdd, tc.podsWithDelayedDeletionPerIndex) + if diff := cmp.Diff(tc.wantIndexesToAdd, gotIndexesToAdd); diff != "" { + t.Fatalf("Unexpected indexes to add: %s", diff) + } + if diff := cmp.Diff(tc.wantRemainingTime, gotRemainingTime); diff != "" { + t.Fatalf("Unexpected remaining time: %s", diff) + } + }) + } +} + func TestJobPodLookup(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) @@ -4541,10 +5170,27 @@ func (pb podBuilder) clearLabels() podBuilder { } func (pb podBuilder) index(ix string) podBuilder { + return pb.annotation(batch.JobCompletionIndexAnnotation, ix) +} + +func (pb podBuilder) indexFailureCount(count string) podBuilder { + return pb.annotation(batch.JobIndexFailureCountAnnotation, count) +} + +func (pb podBuilder) indexIgnoredFailureCount(count string) podBuilder { + return pb.annotation(batch.JobIndexIgnoredFailureCountAnnotation, count) +} + +func (pb podBuilder) annotation(key, value string) podBuilder { if pb.Annotations == nil { pb.Annotations = make(map[string]string) } - pb.Annotations[batch.JobCompletionIndexAnnotation] = ix + pb.Annotations[key] = value + return pb +} + +func (pb podBuilder) status(s v1.PodStatus) podBuilder { + pb.Status = s return pb } @@ -4568,6 +5214,15 @@ func (pb podBuilder) deletionTimestamp() podBuilder { return pb } +func (pb podBuilder) customDeletionTimestamp(t time.Time) podBuilder { + pb.DeletionTimestamp = &metav1.Time{Time: t} + return pb +} + +func completionModePtr(m batch.CompletionMode) *batch.CompletionMode { + return &m +} + func setDurationDuringTest(val *time.Duration, newVal time.Duration) func() { origVal := *val *val = newVal diff --git a/pkg/controller/job/pod_failure_policy.go b/pkg/controller/job/pod_failure_policy.go index eb66161ae0a..98f7ed96241 100644 --- a/pkg/controller/job/pod_failure_policy.go +++ b/pkg/controller/job/pod_failure_policy.go @@ -21,20 +21,24 @@ import ( batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" ) // matchPodFailurePolicy returns information about matching a given failed pod // against the pod failure policy rules. The information is represented as an -// optional job failure message (present in case the pod matched a 'FailJob' -// rule), a boolean indicating if the failure should be counted towards -// backoffLimit (it should not be counted if the pod matched an 'Ignore' rule), -// and a pointer to the matched pod failure policy action. +// - optional job failure message (present in case the pod matched a 'FailJob' rule), +// - a boolean indicating if the failure should be counted towards backoffLimit +// (and backoffLimitPerIndex if specified). It should not be counted +// if the pod matched an 'Ignore' rule, +// - a pointer to the matched pod failure policy action. func matchPodFailurePolicy(podFailurePolicy *batch.PodFailurePolicy, failedPod *v1.Pod) (*string, bool, *batch.PodFailurePolicyAction) { if podFailurePolicy == nil { return nil, true, nil } ignore := batch.PodFailurePolicyActionIgnore failJob := batch.PodFailurePolicyActionFailJob + failIndex := batch.PodFailurePolicyActionFailIndex count := batch.PodFailurePolicyActionCount for index, podFailurePolicyRule := range podFailurePolicy.Rules { if podFailurePolicyRule.OnExitCodes != nil { @@ -42,6 +46,10 @@ func matchPodFailurePolicy(podFailurePolicy *batch.PodFailurePolicy, failedPod * switch podFailurePolicyRule.Action { case batch.PodFailurePolicyActionIgnore: return nil, false, &ignore + case batch.PodFailurePolicyActionFailIndex: + if feature.DefaultFeatureGate.Enabled(features.JobBackoffLimitPerIndex) { + return nil, true, &failIndex + } case batch.PodFailurePolicyActionCount: return nil, true, &count case batch.PodFailurePolicyActionFailJob: @@ -55,6 +63,10 @@ func matchPodFailurePolicy(podFailurePolicy *batch.PodFailurePolicy, failedPod * switch podFailurePolicyRule.Action { case batch.PodFailurePolicyActionIgnore: return nil, false, &ignore + case batch.PodFailurePolicyActionFailIndex: + if feature.DefaultFeatureGate.Enabled(features.JobBackoffLimitPerIndex) { + return nil, true, &failIndex + } case batch.PodFailurePolicyActionCount: return nil, true, &count case batch.PodFailurePolicyActionFailJob: diff --git a/pkg/controller/job/pod_failure_policy_test.go b/pkg/controller/job/pod_failure_policy_test.go index a9f2052ec7a..28e6b6d8900 100644 --- a/pkg/controller/job/pod_failure_policy_test.go +++ b/pkg/controller/job/pod_failure_policy_test.go @@ -23,7 +23,10 @@ import ( batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" _ "k8s.io/kubernetes/pkg/apis/core/install" + "k8s.io/kubernetes/pkg/features" "k8s.io/utils/pointer" ) @@ -34,14 +37,16 @@ func TestMatchPodFailurePolicy(t *testing.T) { } ignore := batch.PodFailurePolicyActionIgnore failJob := batch.PodFailurePolicyActionFailJob + failIndex := batch.PodFailurePolicyActionFailIndex count := batch.PodFailurePolicyActionCount testCases := map[string]struct { - podFailurePolicy *batch.PodFailurePolicy - failedPod *v1.Pod - wantJobFailureMessage *string - wantCountFailed bool - wantAction *batch.PodFailurePolicyAction + enableJobBackoffLimitPerIndex bool + podFailurePolicy *batch.PodFailurePolicy + failedPod *v1.Pod + wantJobFailureMessage *string + wantCountFailed bool + wantAction *batch.PodFailurePolicyAction }{ "unknown action for rule matching by exit codes - skip rule with unknown action": { podFailurePolicy: &batch.PodFailurePolicy{ @@ -292,6 +297,68 @@ func TestMatchPodFailurePolicy(t *testing.T) { wantJobFailureMessage: nil, wantCountFailed: true, }, + "FailIndex rule matched for exit codes; JobBackoffLimitPerIndex enabled": { + enableJobBackoffLimitPerIndex: true, + podFailurePolicy: &batch.PodFailurePolicy{ + Rules: []batch.PodFailurePolicyRule{ + { + Action: batch.PodFailurePolicyActionFailIndex, + OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ + Operator: batch.PodFailurePolicyOnExitCodesOpIn, + Values: []int32{1, 2, 3}, + }, + }, + }, + }, + failedPod: &v1.Pod{ + ObjectMeta: validPodObjectMeta, + Status: v1.PodStatus{ + Phase: v1.PodFailed, + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + ExitCode: 2, + }, + }, + }, + }, + }, + }, + wantCountFailed: true, + wantAction: &failIndex, + }, + "FailIndex rule matched for exit codes; JobBackoffLimitPerIndex disabled": { + enableJobBackoffLimitPerIndex: false, + podFailurePolicy: &batch.PodFailurePolicy{ + Rules: []batch.PodFailurePolicyRule{ + { + Action: batch.PodFailurePolicyActionFailIndex, + OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{ + Operator: batch.PodFailurePolicyOnExitCodesOpIn, + Values: []int32{1, 2, 3}, + }, + }, + }, + }, + failedPod: &v1.Pod{ + ObjectMeta: validPodObjectMeta, + Status: v1.PodStatus{ + Phase: v1.PodFailed, + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + ExitCode: 2, + }, + }, + }, + }, + }, + }, + wantCountFailed: true, + wantAction: nil, + }, "pod failure policy with NotIn operator and value 0": { podFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ @@ -406,6 +473,66 @@ func TestMatchPodFailurePolicy(t *testing.T) { wantCountFailed: true, wantAction: &count, }, + "FailIndex rule matched for pod conditions; JobBackoffLimitPerIndex enabled": { + enableJobBackoffLimitPerIndex: true, + podFailurePolicy: &batch.PodFailurePolicy{ + Rules: []batch.PodFailurePolicyRule{ + { + Action: batch.PodFailurePolicyActionFailIndex, + OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ + { + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + }, + }, + }, + }, + }, + failedPod: &v1.Pod{ + ObjectMeta: validPodObjectMeta, + Status: v1.PodStatus{ + Phase: v1.PodFailed, + Conditions: []v1.PodCondition{ + { + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + }, + }, + }, + }, + wantCountFailed: true, + wantAction: &failIndex, + }, + "FailIndex rule matched for pod conditions; JobBackoffLimitPerIndex disabled": { + enableJobBackoffLimitPerIndex: false, + podFailurePolicy: &batch.PodFailurePolicy{ + Rules: []batch.PodFailurePolicyRule{ + { + Action: batch.PodFailurePolicyActionFailIndex, + OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{ + { + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + }, + }, + }, + }, + }, + failedPod: &v1.Pod{ + ObjectMeta: validPodObjectMeta, + Status: v1.PodStatus{ + Phase: v1.PodFailed, + Conditions: []v1.PodCondition{ + { + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + }, + }, + }, + }, + wantCountFailed: true, + wantAction: nil, + }, "ignore rule matched for pod conditions": { podFailurePolicy: &batch.PodFailurePolicy{ Rules: []batch.PodFailurePolicyRule{ @@ -709,6 +836,7 @@ func TestMatchPodFailurePolicy(t *testing.T) { } for name, tc := range testCases { t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)() jobFailMessage, countFailed, action := matchPodFailurePolicy(tc.podFailurePolicy, tc.failedPod) if diff := cmp.Diff(tc.wantJobFailureMessage, jobFailMessage); diff != "" { t.Errorf("Unexpected job failure message: %s", diff) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index f3cfeebbf57..c2607a296bd 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -682,6 +682,633 @@ func TestJobPodFailurePolicy(t *testing.T) { } } +// TestBackoffLimitPerIndex_DelayedPodDeletion tests the pod deletion is delayed +// until the replacement pod is created, so that the replacement pod has the +// index-failure-count annotation bumped, when BackoffLimitPerIndex is used. +func TestBackoffLimitPerIndex_DelayedPodDeletion(t *testing.T) { + t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) + + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)() + closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-failed") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + defer func() { + cancel() + }() + + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32(1), + Completions: pointer.Int32(1), + BackoffLimitPerIndex: pointer.Int32(1), + CompletionMode: completionModePtr(batchv1.IndexedCompletion), + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 1, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0), "", pointer.String("")) + + // First pod from index 0 failed. + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil { + t.Fatal("Failed trying to fail pod with index 0") + } + // Delete the failed pod + pod, err := getJobPodForIndex(ctx, clientSet, jobObj, 0, func(_ *v1.Pod) bool { return true }) + if err != nil { + t.Fatalf("failed to get terminal pod for index: %v", 0) + } + if err := clientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("failed to delete pod: %v, error: %v", klog.KObj(pod), err) + } + + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 1, + Failed: 1, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0), "", pointer.String("")) + + // Verify the replacement pod is created and has the index-failure-count + // annotation bumped. + replacement, err := getActivePodForIndex(ctx, clientSet, jobObj, 0) + if err != nil { + t.Fatalf("Failed to get active replacement pod for index: %v, error: %v", 0, err) + } + gotIndexFailureCount, err := getIndexFailureCount(replacement) + if err != nil { + t.Fatalf("Failed read the index failure count annotation for pod: %v, error: %v", klog.KObj(replacement), err) + } + if diff := cmp.Diff(1, gotIndexFailureCount); diff != "" { + t.Errorf("Unexpected index failure count for the replacement pod: %s", diff) + } + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 0); err != nil { + t.Fatal("Failed trying to fail pod with index 0") + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 0, + Succeeded: 1, + Failed: 1, + Ready: pointer.Int32(0), + }) + validateJobSucceeded(ctx, t, clientSet, jobObj) +} + +// TestBackoffLimitPerIndex_Reenabling tests handling of pod failures when +// reenabling the BackoffLimitPerIndex feature. +func TestBackoffLimitPerIndex_Reenabling(t *testing.T) { + t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) + + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)() + closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-reenabled") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + defer cancel() + resetMetrics() + + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32(3), + Completions: pointer.Int32(3), + BackoffLimitPerIndex: pointer.Int32(0), + CompletionMode: completionModePtr(batchv1.IndexedCompletion), + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 3, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", pointer.String("")) + + // First pod from index 0 failed + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil { + t.Fatal("Failed trying to fail pod with index 0") + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 2, + Failed: 1, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1, 2), "", pointer.String("0")) + + // Disable the feature + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, false)() + + // First pod from index 1 failed + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + t.Fatal("Failed trying to fail pod with index 1") + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 3, + Failed: 2, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", nil) + + // Reenable the feature + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)() + + // First pod from index 2 failed + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { + t.Fatal("Failed trying to fail pod with index 2") + } + + // Verify the indexes 0 and 1 are active as the failed pods don't have + // finalizers at this point, so they are ignored. + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 2, + Failed: 3, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", pointer.String("2")) + + // mark remaining pods are Succeeded and verify Job status + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil { + t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err) + } + validateJobFailed(ctx, t, clientSet, jobObj) + validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) +} + +// TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff tests that the +// pods are recreated with expotential backoff delay computed independently +// per index. Scenario: +// - fail index 0 +// - fail index 0 +// - fail index 1 +// - succeed index 0 +// - fail index 1 +// - succeed index 1 +func TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)() + t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, 2*time.Second)) + + closeFn, restConfig, clientSet, ns := setup(t, "simple") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + defer cancel() + + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Completions: pointer.Int32(2), + Parallelism: pointer.Int32(2), + BackoffLimitPerIndex: pointer.Int32(2), + CompletionMode: completionModePtr(batchv1.IndexedCompletion), + }, + }) + if err != nil { + t.Fatalf("Could not create job: %v", err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 2, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", pointer.String("")) + + // Fail the first pod for index 0 + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil { + t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 2, + Failed: 1, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", pointer.String("")) + + // Fail the second pod for index 0 + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil { + t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 2, + Failed: 2, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", pointer.String("")) + + // Fail the first pod for index 1 + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 2, + Failed: 3, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", pointer.String("")) + + // Succeed the third pod for index 0 + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 0); err != nil { + t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 1, + Failed: 3, + Succeeded: 1, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1), "0", pointer.String("")) + + // Fail the second pod for index 1 + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 1, + Failed: 4, + Succeeded: 1, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1), "0", pointer.String("")) + + // Succeed the third pod for index 1 + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { + t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 0, + Failed: 4, + Succeeded: 2, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New[int](), "0,1", pointer.String("")) + validateJobSucceeded(ctx, t, clientSet, jobObj) + + for index := 0; index < int(*jobObj.Spec.Completions); index++ { + podsForIndex, err := getJobPodsForIndex(ctx, clientSet, jobObj, index, func(_ *v1.Pod) bool { return true }) + if err != nil { + t.Fatalf("Failed to list job %q pods for index %v, error: %v", klog.KObj(jobObj), index, err) + } + validateExpotentialBackoffDelay(t, jobcontroller.DefaultJobPodFailureBackOff, podsForIndex) + } +} + +// TestBackoffLimitPerIndex tests handling of job and its pods when +// backoff limit per index is used. +func TestBackoffLimitPerIndex(t *testing.T) { + t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) + + type podTerminationWithExpectations struct { + index int + status v1.PodStatus + wantActive int + wantFailed int + wantSucceeded int + wantActiveIndexes sets.Set[int] + wantCompletedIndexes string + wantFailedIndexes *string + wantReplacementPodFailureCount *int + } + + podTemplateSpec := v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "main-container", + Image: "foo", + ImagePullPolicy: v1.PullIfNotPresent, + TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError, + }, + }, + }, + } + testCases := map[string]struct { + job batchv1.Job + podTerminations []podTerminationWithExpectations + wantJobConditionType batchv1.JobConditionType + }{ + "job succeeded": { + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32(2), + Completions: pointer.Int32(2), + CompletionMode: completionModePtr(batchv1.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + Template: podTemplateSpec, + }, + }, + podTerminations: []podTerminationWithExpectations{ + { + status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + wantActive: 2, + wantFailed: 1, + wantActiveIndexes: sets.New(0, 1), + wantFailedIndexes: pointer.String(""), + wantReplacementPodFailureCount: pointer.Int(1), + }, + }, + wantJobConditionType: batchv1.JobComplete, + }, + "job index fails due to exceeding backoff limit per index": { + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32(2), + Completions: pointer.Int32(2), + CompletionMode: completionModePtr(batchv1.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(2), + Template: podTemplateSpec, + }, + }, + podTerminations: []podTerminationWithExpectations{ + { + status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + wantActive: 2, + wantFailed: 1, + wantActiveIndexes: sets.New(0, 1), + wantFailedIndexes: pointer.String(""), + wantReplacementPodFailureCount: pointer.Int(1), + }, + { + status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + wantActive: 2, + wantFailed: 2, + wantActiveIndexes: sets.New(0, 1), + wantFailedIndexes: pointer.String(""), + wantReplacementPodFailureCount: pointer.Int(2), + }, + { + status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + wantActive: 1, + wantFailed: 3, + wantActiveIndexes: sets.New(1), + wantFailedIndexes: pointer.String("0"), + }, + }, + wantJobConditionType: batchv1.JobFailed, + }, + "job index fails due to exceeding the global backoff limit first": { + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32(3), + Completions: pointer.Int32(3), + CompletionMode: completionModePtr(batchv1.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + BackoffLimit: pointer.Int32(2), + Template: podTemplateSpec, + }, + }, + podTerminations: []podTerminationWithExpectations{ + { + index: 0, + status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + wantActive: 3, + wantFailed: 1, + wantActiveIndexes: sets.New(0, 1, 2), + wantFailedIndexes: pointer.String(""), + }, + { + index: 1, + status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + wantActive: 3, + wantFailed: 2, + wantActiveIndexes: sets.New(0, 1, 2), + wantFailedIndexes: pointer.String(""), + }, + { + index: 2, + status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + wantFailed: 5, + wantFailedIndexes: pointer.String(""), + }, + }, + wantJobConditionType: batchv1.JobFailed, + }, + "job continues execution after a failed index, the job is marked Failed due to the failed index": { + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32(2), + Completions: pointer.Int32(2), + CompletionMode: completionModePtr(batchv1.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(0), + Template: podTemplateSpec, + }, + }, + podTerminations: []podTerminationWithExpectations{ + { + index: 0, + status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + wantActive: 1, + wantFailed: 1, + wantActiveIndexes: sets.New(1), + wantFailedIndexes: pointer.String("0"), + }, + { + index: 1, + status: v1.PodStatus{ + Phase: v1.PodSucceeded, + }, + wantFailed: 1, + wantSucceeded: 1, + wantFailedIndexes: pointer.String("0"), + wantCompletedIndexes: "1", + }, + }, + wantJobConditionType: batchv1.JobFailed, + }, + "job execution terminated early due to exceeding max failed indexes": { + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32(3), + Completions: pointer.Int32(3), + CompletionMode: completionModePtr(batchv1.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(0), + MaxFailedIndexes: pointer.Int32(1), + Template: podTemplateSpec, + }, + }, + podTerminations: []podTerminationWithExpectations{ + { + index: 0, + status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + wantActive: 2, + wantFailed: 1, + wantActiveIndexes: sets.New(1, 2), + wantFailedIndexes: pointer.String("0"), + }, + { + index: 1, + status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + wantActive: 0, + wantFailed: 3, + wantFailedIndexes: pointer.String("0,1"), + }, + }, + wantJobConditionType: batchv1.JobFailed, + }, + "pod failure matching pod failure policy rule with FailIndex action": { + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32(2), + Completions: pointer.Int32(2), + CompletionMode: completionModePtr(batchv1.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + Template: podTemplateSpec, + PodFailurePolicy: &batchv1.PodFailurePolicy{ + Rules: []batchv1.PodFailurePolicyRule{ + { + Action: batchv1.PodFailurePolicyActionFailIndex, + OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{ + Operator: batchv1.PodFailurePolicyOnExitCodesOpIn, + Values: []int32{13}, + }, + }, + { + Action: batchv1.PodFailurePolicyActionFailIndex, + OnPodConditions: []batchv1.PodFailurePolicyOnPodConditionsPattern{ + { + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + }, + }, + }, + }, + }, + }, + }, + podTerminations: []podTerminationWithExpectations{ + { + index: 0, + status: v1.PodStatus{ + Phase: v1.PodFailed, + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + ExitCode: 13, + }, + }, + }, + }, + }, + wantActive: 1, + wantFailed: 1, + wantActiveIndexes: sets.New(1), + wantFailedIndexes: pointer.String("0"), + }, + { + index: 1, + status: v1.PodStatus{ + Phase: v1.PodFailed, + Conditions: []v1.PodCondition{ + { + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + }, + }, + }, + wantFailed: 2, + wantFailedIndexes: pointer.String("0,1"), + }, + }, + wantJobConditionType: batchv1.JobFailed, + }, + } + for name, test := range testCases { + t.Run(name, func(t *testing.T) { + resetMetrics() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)() + + closeFn, restConfig, clientSet, ns := setup(t, "simple") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + defer func() { + cancel() + }() + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job) + if err != nil { + t.Fatalf("Error %q while creating the job %q", err, jobObj.Name) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: int(*test.job.Spec.Parallelism), + Ready: pointer.Int32(0), + }) + for _, podTermination := range test.podTerminations { + pod, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index) + if err != nil { + t.Fatalf("listing Job Pods: %q", err) + } + pod.Status = podTermination.status + if _, err = clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil { + t.Fatalf("Error updating the pod %q: %q", klog.KObj(pod), err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: podTermination.wantActive, + Succeeded: podTermination.wantSucceeded, + Failed: podTermination.wantFailed, + Ready: pointer.Int32(0), + }) + validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes) + if podTermination.wantReplacementPodFailureCount != nil { + replacement, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index) + if err != nil { + t.Fatalf("Failed to get active replacement pod for index: %v, error: %v", podTermination.index, err) + } + gotReplacementPodFailureCount, err := getIndexFailureCount(replacement) + if err != nil { + t.Fatalf("Failed read the index failure count annotation for pod: %v, error: %v", klog.KObj(replacement), err) + } + if *podTermination.wantReplacementPodFailureCount != gotReplacementPodFailureCount { + t.Fatalf("Unexpected value of the index failure count annotation. Want: %v, got: %v", *podTermination.wantReplacementPodFailureCount, gotReplacementPodFailureCount) + } + } + } + + remainingActive := test.podTerminations[len(test.podTerminations)-1].wantActive + if remainingActive > 0 { + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remainingActive); err != nil { + t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err) + } + } + validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType) + validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) + }) + } +} + +func getIndexFailureCount(p *v1.Pod) (int, error) { + if p.Annotations == nil { + return 0, errors.New("no annotations found") + } + v, ok := p.Annotations[batchv1.JobIndexFailureCountAnnotation] + if !ok { + return 0, fmt.Errorf("annotation %s not found", batchv1.JobIndexFailureCountAnnotation) + } + return strconv.Atoi(v) +} + +func completionModePtr(cm batchv1.CompletionMode) *batchv1.CompletionMode { + return &cm +} + // TestNonParallelJob tests that a Job that only executes one Pod. The test // recreates the Job controller at some points to make sure a new controller // is able to pickup. @@ -999,7 +1626,7 @@ func TestIndexedJob(t *testing.T) { Active: 3, Ready: pointer.Int32(0), }) - validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "") + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", nil) // One Pod succeeds. if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { @@ -1010,7 +1637,7 @@ func TestIndexedJob(t *testing.T) { Succeeded: 1, Ready: pointer.Int32(0), }) - validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1") + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1", nil) // One Pod fails, which should be recreated. if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { @@ -1022,7 +1649,7 @@ func TestIndexedJob(t *testing.T) { Succeeded: 1, Ready: pointer.Int32(0), }) - validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1") + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1", nil) // Remaining Pods succeed. if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { @@ -1034,7 +1661,7 @@ func TestIndexedJob(t *testing.T) { Succeeded: 4, Ready: pointer.Int32(0), }) - validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3") + validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3", nil) validateJobSucceeded(ctx, t, clientSet, jobObj) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) validateTerminatedPodsTrackingFinalizerMetric(t, 5) @@ -1208,7 +1835,7 @@ func TestElasticIndexedJob(t *testing.T) { Failed: update.wantFailed, Ready: pointer.Int32(0), }) - validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes) + validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes, nil) } validateJobSucceeded(ctx, t, clientSet, jobObj) @@ -1424,10 +2051,14 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) { if len(jobPods) != 3 { t.Fatalf("Expected to get %v pods, received %v", 4, len(jobPods)) } + validateExpotentialBackoffDelay(t, jobcontroller.DefaultJobPodFailureBackOff, jobPods) +} +func validateExpotentialBackoffDelay(t *testing.T, defaultPodFailureBackoff time.Duration, pods []*v1.Pod) { + t.Helper() creationTime := []time.Time{} finishTime := []time.Time{} - for _, pod := range jobPods { + for _, pod := range pods { creationTime = append(creationTime, pod.CreationTimestamp.Time) if len(pod.Status.ContainerStatuses) > 0 { finishTime = append(finishTime, pod.Status.ContainerStatuses[0].State.Terminated.FinishedAt.Time) @@ -1441,25 +2072,24 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) { return finishTime[i].Before(finishTime[j]) }) - if creationTime[1].Sub(finishTime[0]).Seconds() < jobcontroller.DefaultJobPodFailureBackOff.Seconds() { - t.Fatalf("Second pod should be created at least %v seconds after the first pod", jobcontroller.DefaultJobPodFailureBackOff) + diff := creationTime[1].Sub(finishTime[0]) + + if diff < defaultPodFailureBackoff { + t.Fatalf("Second pod should be created at least %v seconds after the first pod, time difference: %v", defaultPodFailureBackoff, diff) } - if creationTime[1].Sub(finishTime[0]).Seconds() >= 2*jobcontroller.DefaultJobPodFailureBackOff.Seconds() { - t.Fatalf("Second pod should be created before %v seconds after the first pod", 2*jobcontroller.DefaultJobPodFailureBackOff) + if diff >= 2*defaultPodFailureBackoff { + t.Fatalf("Second pod should be created before %v seconds after the first pod, time difference: %v", 2*defaultPodFailureBackoff, diff) } - diff := creationTime[2].Sub(finishTime[1]).Seconds() + diff = creationTime[2].Sub(finishTime[1]) - // The third pod should not be created before 4 seconds - if diff < 2*jobcontroller.DefaultJobPodFailureBackOff.Seconds() { - t.Fatalf("Third pod should be created at least %v seconds after the second pod", 2*jobcontroller.DefaultJobPodFailureBackOff) + if diff < 2*defaultPodFailureBackoff { + t.Fatalf("Third pod should be created at least %v seconds after the second pod, time difference: %v", 2*defaultPodFailureBackoff, diff) } - // The third pod should be created within 8 seconds - // This check rules out double counting - if diff >= 4*jobcontroller.DefaultJobPodFailureBackOff.Seconds() { - t.Fatalf("Third pod should be created before %v seconds after the second pod", 4*jobcontroller.DefaultJobPodFailureBackOff) + if diff >= 4*defaultPodFailureBackoff { + t.Fatalf("Third pod should be created before %v seconds after the second pod, time difference: %v", 4*defaultPodFailureBackoff, diff) } } @@ -1815,7 +2445,7 @@ func validateFinishedPodsNoFinalizer(ctx context.Context, t *testing.T, clientSe // validateIndexedJobPods validates indexes and hostname of // active and completed Pods of an Indexed Job. // Call after validateJobPodsStatus -func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, wantActive sets.Set[int], gotCompleted string) { +func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, wantActive sets.Set[int], gotCompleted string, wantFailed *string) { t.Helper() updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) if err != nil { @@ -1824,6 +2454,9 @@ func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clients if updatedJob.Status.CompletedIndexes != gotCompleted { t.Errorf("Got completed indexes %q, want %q", updatedJob.Status.CompletedIndexes, gotCompleted) } + if diff := cmp.Diff(wantFailed, updatedJob.Status.FailedIndexes); diff != "" { + t.Errorf("Got unexpected failed indexes: %s", diff) + } pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { t.Fatalf("Failed to list Job Pods: %v", err) @@ -2005,6 +2638,17 @@ func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, job } if pix, err := getCompletionIndex(&pod); err == nil && pix == ix { pod.Status.Phase = phase + if phase == v1.PodFailed || phase == v1.PodSucceeded { + pod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + FinishedAt: metav1.Now(), + }, + }, + }, + } + } _, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("updating pod %s status: %w", pod.Name, err) @@ -2015,6 +2659,44 @@ func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, job return errors.New("no pod matching index found") } +func getActivePodForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, ix int) (*v1.Pod, error) { + return getJobPodForIndex(ctx, clientSet, jobObj, ix, func(p *v1.Pod) bool { + return !podutil.IsPodTerminal(p) + }) +} + +func getJobPodForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, ix int, filter func(*v1.Pod) bool) (*v1.Pod, error) { + pods, err := getJobPodsForIndex(ctx, clientSet, jobObj, ix, filter) + if err != nil { + return nil, err + } + if len(pods) == 0 { + return nil, fmt.Errorf("Pod not found for index: %v", ix) + } + return pods[0], nil +} + +func getJobPodsForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, ix int, filter func(*v1.Pod) bool) ([]*v1.Pod, error) { + pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("listing Job Pods: %w", err) + } + var result []*v1.Pod + for _, pod := range pods.Items { + pod := pod + if !metav1.IsControlledBy(&pod, jobObj) { + continue + } + if !filter(&pod) { + continue + } + if pix, err := getCompletionIndex(&pod); err == nil && pix == ix { + result = append(result, &pod) + } + } + return result, nil +} + func getCompletionIndex(p *v1.Pod) (int, error) { if p.Annotations == nil { return 0, errors.New("no annotations found")