Job controller implementation of backoff limit per index (#118009)

This commit is contained in:
Michał Woźniak 2023-07-18 22:44:11 +02:00 committed by GitHub
parent f55f2785e2
commit a15c27661e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 2345 additions and 73 deletions

View File

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
@ -213,12 +214,31 @@ func getFinishTimeFromDeletionTimestamp(p *v1.Pod) *time.Time {
}
func (backoff backoffRecord) getRemainingTime(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration) time.Duration {
if backoff.failuresAfterLastSuccess == 0 {
return getRemainingTimeForFailuresCount(clock, defaultBackoff, maxBackoff, backoff.failuresAfterLastSuccess, backoff.lastFailureTime)
}
// getRemainingTimePerIndex returns the remaining time left for a given index to
// create the replacement pods. The number of consecutive pod failures for the
// index is retrieved from the `job-index-failure-count` annotation of the
// last failed pod within the index (represented by `lastFailedPod`).
// The last failed pod is also used to determine the time of the last failure.
func getRemainingTimePerIndex(logger klog.Logger, clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration, lastFailedPod *v1.Pod) time.Duration {
if lastFailedPod == nil {
// There is no previous failed pod for this index
return time.Duration(0)
}
failureCount := getIndexAbsoluteFailureCount(logger, lastFailedPod) + 1
lastFailureTime := getFinishedTime(lastFailedPod)
return getRemainingTimeForFailuresCount(clock, defaultBackoff, maxBackoff, failureCount, &lastFailureTime)
}
func getRemainingTimeForFailuresCount(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration, failuresCount int32, lastFailureTime *time.Time) time.Duration {
if failuresCount == 0 {
return 0
}
backoffDuration := defaultBackoff
for i := 1; i < int(backoff.failuresAfterLastSuccess); i++ {
for i := 1; i < int(failuresCount); i++ {
backoffDuration = backoffDuration * 2
if backoffDuration >= maxBackoff {
backoffDuration = maxBackoff
@ -226,7 +246,7 @@ func (backoff backoffRecord) getRemainingTime(clock clock.WithTicker, defaultBac
}
}
timeElapsedSinceLastFailure := clock.Since(*backoff.lastFailureTime)
timeElapsedSinceLastFailure := clock.Since(*lastFailureTime)
if backoffDuration < timeElapsedSinceLastFailure {
return 0

View File

@ -23,6 +23,7 @@ import (
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2/ktesting"
clocktesting "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
)
@ -466,3 +467,46 @@ func TestGetRemainingBackoffTime(t *testing.T) {
})
}
}
func TestGetRemainingBackoffTimePerIndex(t *testing.T) {
defaultTestTime := metav1.NewTime(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC))
testCases := map[string]struct {
currentTime time.Time
maxBackoff time.Duration
defaultBackoff time.Duration
lastFailedPod *v1.Pod
wantDuration time.Duration
}{
"no failures": {
lastFailedPod: nil,
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 0 * time.Second,
},
"two prev failures; current time and failure time are same": {
lastFailedPod: buildPod().phase(v1.PodFailed).indexFailureCount("2").customDeletionTimestamp(defaultTestTime.Time).Pod,
currentTime: defaultTestTime.Time,
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 20 * time.Second,
},
"one prev failure counted and one ignored; current time and failure time are same": {
lastFailedPod: buildPod().phase(v1.PodFailed).indexFailureCount("1").indexIgnoredFailureCount("1").customDeletionTimestamp(defaultTestTime.Time).Pod,
currentTime: defaultTestTime.Time,
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 20 * time.Second,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
fakeClock := clocktesting.NewFakeClock(tc.currentTime.Truncate(time.Second))
d := getRemainingTimePerIndex(logger, fakeClock, tc.defaultBackoff, tc.maxBackoff, tc.lastFailedPod)
if d.Seconds() != tc.wantDuration.Seconds() {
t.Errorf("Expected value of duration %v; got %v", tc.wantDuration, d)
}
})
}
}

View File

@ -18,6 +18,7 @@ package job
import (
"fmt"
"math"
"sort"
"strconv"
"strings"
@ -41,6 +42,10 @@ func isIndexedJob(job *batch.Job) bool {
return job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion
}
func hasBackoffLimitPerIndex(job *batch.Job) bool {
return feature.DefaultFeatureGate.Enabled(features.JobBackoffLimitPerIndex) && job.Spec.BackoffLimitPerIndex != nil
}
type interval struct {
First int
Last int
@ -54,7 +59,7 @@ type orderedIntervals []interval
// empty list if this Job is not tracked with finalizers. The new list includes
// the indexes that succeeded since the last sync.
func calculateSucceededIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) {
prevIntervals := succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
prevIntervals := parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
newSucceeded := sets.New[int]()
for _, p := range pods {
ix := getCompletionIndex(p.Annotations)
@ -69,9 +74,55 @@ func calculateSucceededIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Po
return prevIntervals, result
}
// calculateFailedIndexes returns the list of failed indexes in compressed
// format (intervals). The list includes indexes already present in
// .status.failedIndexes and indexes that failed since the last sync.
func calculateFailedIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Pod) *orderedIntervals {
var prevIntervals orderedIntervals
if job.Status.FailedIndexes != nil {
prevIntervals = parseIndexesFromString(logger, *job.Status.FailedIndexes, int(*job.Spec.Completions))
}
newFailed := sets.New[int]()
for _, p := range pods {
ix := getCompletionIndex(p.Annotations)
// Failed Pod with valid index and has a finalizer (meaning that it is not counted yet).
if ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && hasJobTrackingFinalizer(p) && isIndexFailed(logger, job, p) {
newFailed.Insert(ix)
}
}
// List returns the items of the set in order.
result := prevIntervals.withOrderedIndexes(sets.List(newFailed))
return &result
}
func isIndexFailed(logger klog.Logger, job *batch.Job, pod *v1.Pod) bool {
isPodFailedCounted := false
if isPodFailed(pod, job) {
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
_, countFailed, action := matchPodFailurePolicy(job.Spec.PodFailurePolicy, pod)
if action != nil && *action == batch.PodFailurePolicyActionFailIndex {
return true
}
isPodFailedCounted = countFailed
} else {
isPodFailedCounted = true
}
}
return isPodFailedCounted && getIndexFailureCount(logger, pod) >= *job.Spec.BackoffLimitPerIndex
}
// withOrderedIndexes returns a new list of ordered intervals that contains
// the newIndexes, provided in increasing order.
func (oi orderedIntervals) withOrderedIndexes(newIndexes []int) orderedIntervals {
newIndexIntervals := make(orderedIntervals, len(newIndexes))
for i, newIndex := range newIndexes {
newIndexIntervals[i] = interval{newIndex, newIndex}
}
return oi.merge(newIndexIntervals)
}
// with returns a new list of ordered intervals that contains the newOrderedIntervals.
func (oi orderedIntervals) merge(newOi orderedIntervals) orderedIntervals {
var result orderedIntervals
i := 0
j := 0
@ -84,12 +135,12 @@ func (oi orderedIntervals) withOrderedIndexes(newIndexes []int) orderedIntervals
lastInterval.Last = thisInterval.Last
}
}
for i < len(oi) && j < len(newIndexes) {
if oi[i].First < newIndexes[j] {
for i < len(oi) && j < len(newOi) {
if oi[i].First < newOi[j].First {
appendOrMergeWithLastInterval(oi[i])
i++
} else {
appendOrMergeWithLastInterval(interval{newIndexes[j], newIndexes[j]})
appendOrMergeWithLastInterval(newOi[j])
j++
}
}
@ -97,8 +148,8 @@ func (oi orderedIntervals) withOrderedIndexes(newIndexes []int) orderedIntervals
appendOrMergeWithLastInterval(oi[i])
i++
}
for j < len(newIndexes) {
appendOrMergeWithLastInterval(interval{newIndexes[j], newIndexes[j]})
for j < len(newOi) {
appendOrMergeWithLastInterval(newOi[j])
j++
}
return result
@ -150,19 +201,19 @@ func (oi orderedIntervals) has(ix int) bool {
return oi[hi].First <= ix
}
func succeededIndexesFromString(logger klog.Logger, completedIndexes string, completions int) orderedIntervals {
if completedIndexes == "" {
func parseIndexesFromString(logger klog.Logger, indexesStr string, completions int) orderedIntervals {
if indexesStr == "" {
return nil
}
var result orderedIntervals
var lastInterval *interval
for _, intervalStr := range strings.Split(completedIndexes, ",") {
for _, intervalStr := range strings.Split(indexesStr, ",") {
limitsStr := strings.Split(intervalStr, "-")
var inter interval
var err error
inter.First, err = strconv.Atoi(limitsStr[0])
if err != nil {
logger.Info("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err)
logger.Info("Corrupted indexes interval, ignoring", "interval", intervalStr, "err", err)
continue
}
if inter.First >= completions {
@ -171,7 +222,7 @@ func succeededIndexesFromString(logger klog.Logger, completedIndexes string, com
if len(limitsStr) > 1 {
inter.Last, err = strconv.Atoi(limitsStr[1])
if err != nil {
logger.Info("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err)
logger.Info("Corrupted indexes interval, ignoring", "interval", intervalStr, "err", err)
continue
}
if inter.Last >= completions {
@ -191,20 +242,17 @@ func succeededIndexesFromString(logger klog.Logger, completedIndexes string, com
}
// firstPendingIndexes returns `count` indexes less than `completions` that are
// not covered by `activePods` or `succeededIndexes`.
// not covered by `activePods`, `succeededIndexes` or `failedIndexes`.
func firstPendingIndexes(jobCtx *syncJobCtx, count, completions int) []int {
if count == 0 {
return nil
}
active := sets.New[int]()
for _, p := range jobCtx.activePods {
ix := getCompletionIndex(p.Annotations)
if ix != unknownCompletionIndex {
active.Insert(ix)
}
}
active := getIndexes(jobCtx.activePods)
result := make([]int, 0, count)
nonPending := jobCtx.succeededIndexes.withOrderedIndexes(sets.List(active))
if jobCtx.failedIndexes != nil {
nonPending = nonPending.merge(*jobCtx.failedIndexes)
}
// The following algorithm is bounded by len(nonPending) and count.
candidate := 0
for _, sInterval := range nonPending {
@ -221,6 +269,18 @@ func firstPendingIndexes(jobCtx *syncJobCtx, count, completions int) []int {
return result
}
// Returns the list of indexes corresponding to the set of pods
func getIndexes(pods []*v1.Pod) sets.Set[int] {
result := sets.New[int]()
for _, p := range pods {
ix := getCompletionIndex(p.Annotations)
if ix != unknownCompletionIndex {
result.Insert(ix)
}
}
return result
}
// appendDuplicatedIndexPodsForRemoval scans active `pods` for duplicated
// completion indexes. For each index, it selects n-1 pods for removal, where n
// is the number of repetitions. The pods to be removed are appended to `rm`,
@ -248,6 +308,69 @@ func appendDuplicatedIndexPodsForRemoval(rm, left, pods []*v1.Pod, completions i
return appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:countLooped], lastIndex)
}
// getPodsWithDelayedDeletionPerIndex returns the pod which removal is delayed
// in order to await for recreation. This map is used when BackoffLimitPerIndex
// is enabled to delay pod finalizer removal, and thus pod deletion, until the
// replacement pod is created. The pod deletion is delayed so that the
// replacement pod can have the batch.kubernetes.io/job-index-failure-count
// annotation set properly keeping track of the number of failed pods within
// the index.
func getPodsWithDelayedDeletionPerIndex(logger klog.Logger, jobCtx *syncJobCtx) map[int]*v1.Pod {
// the failed pods corresponding to currently active indexes can be safely
// deleted as the failure count annotation is present in the currently
// active pods.
activeIndexes := getIndexes(jobCtx.activePods)
podsWithDelayedDeletionPerIndex := make(map[int]*v1.Pod)
getValidPodsWithFilter(jobCtx, nil, func(p *v1.Pod) bool {
if isPodFailed(p, jobCtx.job) {
if ix := getCompletionIndex(p.Annotations); ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions) {
if jobCtx.succeededIndexes.has(ix) || jobCtx.failedIndexes.has(ix) || activeIndexes.Has(ix) {
return false
}
if lastPodWithDelayedDeletion, ok := podsWithDelayedDeletionPerIndex[ix]; ok {
if getIndexAbsoluteFailureCount(logger, lastPodWithDelayedDeletion) <= getIndexAbsoluteFailureCount(logger, p) && !getFinishedTime(p).Before(getFinishedTime(lastPodWithDelayedDeletion)) {
podsWithDelayedDeletionPerIndex[ix] = p
}
} else {
podsWithDelayedDeletionPerIndex[ix] = p
}
}
}
return false
})
return podsWithDelayedDeletionPerIndex
}
func addIndexFailureCountAnnotation(logger klog.Logger, template *v1.PodTemplateSpec, job *batch.Job, podBeingReplaced *v1.Pod) {
indexFailureCount, indexIgnoredFailureCount := getNewIndexFailureCounts(logger, job, podBeingReplaced)
template.Annotations[batch.JobIndexFailureCountAnnotation] = strconv.Itoa(int(indexFailureCount))
if indexIgnoredFailureCount > 0 {
template.Annotations[batch.JobIndexIgnoredFailureCountAnnotation] = strconv.Itoa(int(indexIgnoredFailureCount))
}
}
// getNewIndexFailureCount returns the value of the index-failure-count
// annotation for the new pod being created
func getNewIndexFailureCounts(logger klog.Logger, job *batch.Job, podBeingReplaced *v1.Pod) (int32, int32) {
if podBeingReplaced != nil {
indexFailureCount := parseIndexFailureCountAnnotation(logger, podBeingReplaced)
indexIgnoredFailureCount := parseIndexFailureIgnoreCountAnnotation(logger, podBeingReplaced)
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
_, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, podBeingReplaced)
if countFailed {
indexFailureCount++
} else {
indexIgnoredFailureCount++
}
} else {
indexFailureCount++
}
return indexFailureCount, indexIgnoredFailureCount
}
return 0, 0
}
func appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods []*v1.Pod, ix int) ([]*v1.Pod, []*v1.Pod) {
if ix == unknownCompletionIndex {
rm = append(rm, pods...)
@ -281,6 +404,49 @@ func getCompletionIndex(annotations map[string]string) int {
return i
}
// getIndexFailureCount returns the value of the batch.kubernetes.io/job-index-failure-count
// annotation as int32. It fallbacks to 0 when:
// - there is no annotation - for example the pod was created when the BackoffLimitPerIndex
// feature was temporarily disabled, or the annotation was manually removed by the user,
// - the value of the annotation isn't parsable as int - for example because
// it was set by a malicious user,
// - the value of the annotation is negative or greater by int32 - for example
// because it was set by a malicious user.
func getIndexFailureCount(logger klog.Logger, pod *v1.Pod) int32 {
return parseIndexFailureCountAnnotation(logger, pod)
}
func getIndexAbsoluteFailureCount(logger klog.Logger, pod *v1.Pod) int32 {
return parseIndexFailureCountAnnotation(logger, pod) + parseIndexFailureIgnoreCountAnnotation(logger, pod)
}
func parseIndexFailureCountAnnotation(logger klog.Logger, pod *v1.Pod) int32 {
if value, ok := pod.Annotations[batch.JobIndexFailureCountAnnotation]; ok {
return parseInt32(logger, value)
}
logger.V(3).Info("There is no expected annotation", "annotationKey", batch.JobIndexFailureCountAnnotation, "pod", klog.KObj(pod), "podUID", pod.UID)
return 0
}
func parseIndexFailureIgnoreCountAnnotation(logger klog.Logger, pod *v1.Pod) int32 {
if value, ok := pod.Annotations[batch.JobIndexIgnoredFailureCountAnnotation]; ok {
return parseInt32(logger, value)
}
return 0
}
func parseInt32(logger klog.Logger, vStr string) int32 {
if vInt, err := strconv.Atoi(vStr); err != nil {
logger.Error(err, "Failed to parse the value", "value", vStr)
return 0
} else if vInt < 0 || vInt > math.MaxInt32 {
logger.Info("The value is invalid", "value", vInt)
return 0
} else {
return int32(vInt)
}
}
func addCompletionIndexEnvVariables(template *v1.PodTemplateSpec) {
for i := range template.Spec.InitContainers {
addCompletionIndexEnvVariable(&template.Spec.InitContainers[i])

View File

@ -17,12 +17,20 @@ limitations under the License.
package job
import (
"math"
"strconv"
"testing"
"time"
"github.com/google/go-cmp/cmp"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/pointer"
)
@ -219,6 +227,427 @@ func TestCalculateSucceededIndexes(t *testing.T) {
}
}
func TestIsIndexFailed(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cases := map[string]struct {
enableJobPodFailurePolicy bool
job batch.Job
pod *v1.Pod
wantResult bool
}{
"failed pod exceeding backoffLimitPerIndex, when backoffLimitPerIndex=0": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(0),
},
},
pod: buildPod().indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
wantResult: true,
},
"failed pod exceeding backoffLimitPerIndex, when backoffLimitPerIndex=1": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(1),
},
},
pod: buildPod().indexFailureCount("1").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
wantResult: true,
},
"matching FailIndex pod failure policy; JobPodFailurePolicy enabled": {
enableJobPodFailurePolicy: true,
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(1),
PodFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailIndex,
OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
Operator: batch.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{3},
},
},
},
},
},
},
pod: buildPod().indexFailureCount("0").status(v1.PodStatus{
Phase: v1.PodFailed,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 3,
},
},
},
},
}).index("0").trackingFinalizer().Pod,
wantResult: true,
},
"matching FailIndex pod failure policy; JobPodFailurePolicy disabled": {
enableJobPodFailurePolicy: false,
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(1),
PodFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailIndex,
OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
Operator: batch.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{3},
},
},
},
},
},
},
pod: buildPod().indexFailureCount("0").status(v1.PodStatus{
Phase: v1.PodFailed,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 3,
},
},
},
},
}).index("0").trackingFinalizer().Pod,
wantResult: false,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
gotResult := isIndexFailed(logger, &tc.job, tc.pod)
if diff := cmp.Diff(tc.wantResult, gotResult); diff != "" {
t.Errorf("Unexpected result (-want,+got):\n%s", diff)
}
})
}
}
func TestCalculateFailedIndexes(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cases := map[string]struct {
enableJobPodFailurePolicy bool
job batch.Job
pods []*v1.Pod
wantPrevFailedIndexes orderedIntervals
wantFailedIndexes orderedIntervals
}{
"one new index failed": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(1),
},
},
pods: []*v1.Pod{
buildPod().indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
buildPod().indexFailureCount("1").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
},
wantFailedIndexes: []interval{{1, 1}},
},
"pod without finalizer is ignored": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(0),
},
},
pods: []*v1.Pod{
buildPod().indexFailureCount("0").phase(v1.PodFailed).index("0").Pod,
},
wantFailedIndexes: nil,
},
"pod outside completions is ignored": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(0),
},
},
pods: []*v1.Pod{
buildPod().indexFailureCount("0").phase(v1.PodFailed).index("3").Pod,
},
wantFailedIndexes: nil,
},
"extend the failed indexes": {
job: batch.Job{
Status: batch.JobStatus{
FailedIndexes: pointer.String("0"),
},
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(0),
},
},
pods: []*v1.Pod{
buildPod().indexFailureCount("0").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
},
wantFailedIndexes: []interval{{0, 1}},
},
"prev failed indexes empty": {
job: batch.Job{
Status: batch.JobStatus{
FailedIndexes: pointer.String(""),
},
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(0),
},
},
pods: []*v1.Pod{
buildPod().indexFailureCount("0").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
},
wantFailedIndexes: []interval{{1, 1}},
},
"prev failed indexes outside the completions": {
job: batch.Job{
Status: batch.JobStatus{
FailedIndexes: pointer.String("9"),
},
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(0),
},
},
pods: []*v1.Pod{
buildPod().indexFailureCount("0").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
},
wantFailedIndexes: []interval{{1, 1}},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
failedIndexes := calculateFailedIndexes(logger, &tc.job, tc.pods)
if diff := cmp.Diff(&tc.wantFailedIndexes, failedIndexes); diff != "" {
t.Errorf("Unexpected failed indexes (-want,+got):\n%s", diff)
}
})
}
}
func TestGetPodsWithDelayedDeletionPerIndex(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
now := time.Now()
cases := map[string]struct {
enableJobPodFailurePolicy bool
job batch.Job
pods []*v1.Pod
expectedRmFinalizers sets.Set[string]
wantPodsWithDelayedDeletionPerIndex []string
}{
"failed pods are kept corresponding to non-failed indexes are kept": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(3),
BackoffLimitPerIndex: pointer.Int32(1),
},
},
pods: []*v1.Pod{
buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
buildPod().uid("b").indexFailureCount("1").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
buildPod().uid("c").indexFailureCount("0").phase(v1.PodFailed).index("2").trackingFinalizer().Pod,
},
wantPodsWithDelayedDeletionPerIndex: []string{"a", "c"},
},
"failed pod without finalizer; the pod's deletion is not delayed as it already started": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(0),
},
},
pods: []*v1.Pod{
buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").Pod,
},
wantPodsWithDelayedDeletionPerIndex: []string{},
},
"failed pod with expected finalizer removal; the pod's deletion is not delayed as it already started": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(0),
},
},
pods: []*v1.Pod{
buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
},
expectedRmFinalizers: sets.New("a"),
wantPodsWithDelayedDeletionPerIndex: []string{},
},
"failed pod with index outside of completions; the pod's deletion is not delayed": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(0),
},
},
pods: []*v1.Pod{
buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("4").trackingFinalizer().Pod,
},
wantPodsWithDelayedDeletionPerIndex: []string{},
},
"failed pod for active index; the pod's deletion is not delayed as it is already replaced": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(1),
},
},
pods: []*v1.Pod{
buildPod().uid("a1").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
buildPod().uid("a2").indexFailureCount("1").phase(v1.PodRunning).index("0").trackingFinalizer().Pod,
},
wantPodsWithDelayedDeletionPerIndex: []string{},
},
"failed pod for succeeded index; the pod's deletion is not delayed as it is already replaced": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(1),
},
},
pods: []*v1.Pod{
buildPod().uid("a1").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
buildPod().uid("a2").indexFailureCount("1").phase(v1.PodSucceeded).index("0").trackingFinalizer().Pod,
},
wantPodsWithDelayedDeletionPerIndex: []string{},
},
"multiple failed pods for index with different failure count; only the pod with highest failure count is kept": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(4),
},
},
pods: []*v1.Pod{
buildPod().uid("a1").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
buildPod().uid("a3").indexFailureCount("2").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
buildPod().uid("a2").indexFailureCount("1").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
},
wantPodsWithDelayedDeletionPerIndex: []string{"a3"},
},
"multiple failed pods for index with different finish times; only the last failed pod is kept": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(4),
},
},
pods: []*v1.Pod{
buildPod().uid("a1").indexFailureCount("1").phase(v1.PodFailed).index("0").customDeletionTimestamp(now.Add(-time.Second)).trackingFinalizer().Pod,
buildPod().uid("a3").indexFailureCount("1").phase(v1.PodFailed).index("0").customDeletionTimestamp(now).trackingFinalizer().Pod,
buildPod().uid("a2").indexFailureCount("1").phase(v1.PodFailed).index("0").customDeletionTimestamp(now.Add(-2 * time.Second)).trackingFinalizer().Pod,
},
wantPodsWithDelayedDeletionPerIndex: []string{"a3"},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
activePods := controller.FilterActivePods(logger, tc.pods)
failedIndexes := calculateFailedIndexes(logger, &tc.job, tc.pods)
_, succeededIndexes := calculateSucceededIndexes(logger, &tc.job, tc.pods)
jobCtx := &syncJobCtx{
job: &tc.job,
pods: tc.pods,
activePods: activePods,
succeededIndexes: succeededIndexes,
failedIndexes: failedIndexes,
expectedRmFinalizers: tc.expectedRmFinalizers,
}
gotPodsWithDelayedDeletionPerIndex := getPodsWithDelayedDeletionPerIndex(logger, jobCtx)
gotPodsWithDelayedDeletionPerIndexSet := sets.New[string]()
for _, pod := range gotPodsWithDelayedDeletionPerIndex {
gotPodsWithDelayedDeletionPerIndexSet.Insert(string(pod.UID))
}
if diff := cmp.Diff(tc.wantPodsWithDelayedDeletionPerIndex, sets.List(gotPodsWithDelayedDeletionPerIndexSet)); diff != "" {
t.Errorf("Unexpected set of pods with delayed deletion (-want,+got):\n%s", diff)
}
})
}
}
func TestGetNewIndexFailureCountValue(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cases := map[string]struct {
enableJobPodFailurePolicy bool
job batch.Job
pod *v1.Pod
wantNewIndexFailureCount int32
wantNewIndexIgnoredFailureCount int32
}{
"first pod created": {
job: batch.Job{},
wantNewIndexFailureCount: 0,
},
"failed pod being replaced with 0 index failure count": {
job: batch.Job{},
pod: buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
wantNewIndexFailureCount: 1,
},
"failed pod being replaced with >0 index failure count": {
job: batch.Job{},
pod: buildPod().uid("a").indexFailureCount("3").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
wantNewIndexFailureCount: 4,
},
"failed pod being replaced, matching the ignore rule; JobPodFailurePolicy enabled": {
enableJobPodFailurePolicy: true,
job: batch.Job{
Spec: batch.JobSpec{
PodFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionIgnore,
OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
},
},
},
},
pod: buildPod().uid("a").indexFailureCount("3").status(v1.PodStatus{
Phase: v1.PodFailed,
Conditions: []v1.PodCondition{
{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
},
},
}).index("3").trackingFinalizer().Pod,
wantNewIndexFailureCount: 3,
wantNewIndexIgnoredFailureCount: 1,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
gotNewIndexFailureCount, gotNewIndexIgnoredFailureCount := getNewIndexFailureCounts(logger, &tc.job, tc.pod)
if diff := cmp.Diff(tc.wantNewIndexFailureCount, gotNewIndexFailureCount); diff != "" {
t.Errorf("Unexpected set of pods with delayed deletion (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(tc.wantNewIndexIgnoredFailureCount, gotNewIndexIgnoredFailureCount); diff != "" {
t.Errorf("Unexpected set of pods with delayed deletion (-want,+got):\n%s", diff)
}
})
}
}
func TestIntervalsHaveIndex(t *testing.T) {
cases := map[string]struct {
intervals orderedIntervals
@ -267,6 +696,7 @@ func TestFirstPendingIndexes(t *testing.T) {
completions int
activePods []indexPhase
succeededIndexes []interval
failedIndexes *orderedIntervals
want []int
}{
"cnt greater than completions": {
@ -310,12 +740,24 @@ func TestFirstPendingIndexes(t *testing.T) {
completions: 20,
want: []int{0, 1, 6, 7, 10},
},
"with failed indexes": {
activePods: []indexPhase{
{"3", v1.PodPending},
{"9", v1.PodPending},
},
succeededIndexes: []interval{{1, 1}, {5, 5}, {9, 9}},
failedIndexes: &orderedIntervals{{2, 2}, {6, 7}},
cnt: 5,
completions: 20,
want: []int{0, 4, 8, 10, 11},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
jobCtx := &syncJobCtx{
activePods: hollowPodsWithIndexPhase(tc.activePods),
succeededIndexes: tc.succeededIndexes,
failedIndexes: tc.failedIndexes,
}
got := firstPendingIndexes(jobCtx, tc.cnt, tc.completions)
if diff := cmp.Diff(tc.want, got); diff != "" {
@ -446,6 +888,47 @@ func TestPodGenerateNameWithIndex(t *testing.T) {
}
}
func TestGetIndexFailureCount(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cases := map[string]struct {
pod *v1.Pod
wantResult int32
}{
"no annotation": {
pod: &v1.Pod{},
wantResult: 0,
},
"valid value": {
pod: buildPod().indexFailureCount("2").Pod,
wantResult: 2,
},
"valid maxint32 value": {
pod: buildPod().indexFailureCount(strconv.Itoa(math.MaxInt32)).Pod,
wantResult: math.MaxInt32,
},
"too large value": {
pod: buildPod().indexFailureCount(strconv.Itoa(math.MaxInt32 + 1)).Pod,
wantResult: 0,
},
"negative value": {
pod: buildPod().indexFailureCount("-1").Pod,
wantResult: 0,
},
"invalid int value": {
pod: buildPod().indexFailureCount("xyz").Pod,
wantResult: 0,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
gotResult := getIndexFailureCount(logger, tc.pod)
if diff := cmp.Equal(tc.wantResult, gotResult); !diff {
t.Errorf("Unexpected result. want: %d, got: %d", tc.wantResult, gotResult)
}
})
}
}
func hollowPodsWithIndexPhase(descs []indexPhase) []*v1.Pod {
pods := make([]*v1.Pod, 0, len(descs))
for _, desc := range descs {

View File

@ -132,16 +132,18 @@ type Controller struct {
}
type syncJobCtx struct {
job *batch.Job
pods []*v1.Pod
finishedCondition *batch.JobCondition
activePods []*v1.Pod
succeeded int32
prevSucceededIndexes orderedIntervals
succeededIndexes orderedIntervals
newBackoffRecord backoffRecord
expectedRmFinalizers sets.Set[string]
uncounted *uncountedTerminatedPods
job *batch.Job
pods []*v1.Pod
finishedCondition *batch.JobCondition
activePods []*v1.Pod
succeeded int32
prevSucceededIndexes orderedIntervals
succeededIndexes orderedIntervals
failedIndexes *orderedIntervals
newBackoffRecord backoffRecord
expectedRmFinalizers sets.Set[string]
uncounted *uncountedTerminatedPods
podsWithDelayedDeletionPerIndex map[int]*v1.Pod
}
// NewController creates a new Job controller that keeps the relevant pods
@ -835,6 +837,17 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if isIndexedJob(&job) {
jobCtx.prevSucceededIndexes, jobCtx.succeededIndexes = calculateSucceededIndexes(logger, &job, pods)
jobCtx.succeeded = int32(jobCtx.succeededIndexes.total())
if hasBackoffLimitPerIndex(&job) {
jobCtx.failedIndexes = calculateFailedIndexes(logger, &job, pods)
if jobCtx.finishedCondition == nil {
if job.Spec.MaxFailedIndexes != nil && jobCtx.failedIndexes.total() > int(*job.Spec.MaxFailedIndexes) {
jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "MaxFailedIndexesExceeded", "Job has exceeded the specified maximal number of failed indexes", jm.clock.Now())
} else if jobCtx.failedIndexes.total() > 0 && jobCtx.failedIndexes.total()+jobCtx.succeededIndexes.total() >= int(*job.Spec.Completions) {
jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "FailedIndexes", "Job has failed indexes", jm.clock.Now())
}
}
jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx)
}
}
suspendCondChanged := false
// Remove active pods if Job failed.
@ -1017,9 +1030,10 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
continue
}
considerPodFailed := isPodFailed(pod, jobCtx.job)
if podutil.IsPodTerminal(pod) || considerPodFailed || jobCtx.finishedCondition != nil || jobCtx.job.DeletionTimestamp != nil {
podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
if !canRemoveFinalizer(logger, jobCtx, pod, considerPodFailed) {
continue
}
podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
if pod.Status.Phase == v1.PodSucceeded && !jobCtx.uncounted.failed.Has(string(pod.UID)) {
if isIndexed {
// The completion index is enough to avoid recounting succeeded pods.
@ -1073,6 +1087,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
}
jobCtx.job.Status.Succeeded = int32(jobCtx.succeededIndexes.total())
jobCtx.job.Status.CompletedIndexes = succeededIndexesStr
var failedIndexesStr *string
if jobCtx.failedIndexes != nil {
failedIndexesStr = pointer.String(jobCtx.failedIndexes.String())
}
if !pointer.StringEqual(jobCtx.job.Status.FailedIndexes, failedIndexesStr) {
jobCtx.job.Status.FailedIndexes = failedIndexesStr
needsFlush = true
}
}
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
if jobCtx.finishedCondition != nil && jobCtx.finishedCondition.Type == batch.JobFailureTarget {
@ -1106,6 +1128,32 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
return nil
}
// canRemoveFinalizer determines if the pod's finalizer can be safely removed.
// The finalizer can be removed when:
// - the entire Job is terminating; or
// - the pod's index is succeeded; or
// - the Pod is considered failed, unless it's removal is delayed for the
// purpose of transferring the JobIndexFailureCount annotations to the
// replacement pod. the entire Job is terminating the finalizer can be
// removed unconditionally.
func canRemoveFinalizer(logger klog.Logger, jobCtx *syncJobCtx, pod *v1.Pod, considerPodFailed bool) bool {
if jobCtx.job.DeletionTimestamp != nil || jobCtx.finishedCondition != nil || pod.Status.Phase == v1.PodSucceeded {
return true
}
if !considerPodFailed {
return false
}
if hasBackoffLimitPerIndex(jobCtx.job) {
if index := getCompletionIndex(pod.Annotations); index != unknownCompletionIndex {
if p, ok := jobCtx.podsWithDelayedDeletionPerIndex[index]; ok && p.UID == pod.UID {
logger.V(3).Info("Delaying pod finalizer removal to await for pod recreation within the index", "pod", klog.KObj(pod))
return false
}
}
}
return true
}
// flushUncountedAndRemoveFinalizers does:
// 1. flush the Job status that might include new uncounted Pod UIDs. Also flush the interim FailureTarget condition
// if present.
@ -1443,7 +1491,11 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
}
if active < wantActive {
remainingTime := jobCtx.newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff)
var remainingTime time.Duration
if !hasBackoffLimitPerIndex(job) {
// we compute the global remaining time for pod creation when backoffLimitPerIndex is not used
remainingTime = jobCtx.newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff)
}
if remainingTime > 0 {
jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
return 0, metrics.JobSyncActionPodsCreated, nil
@ -1456,6 +1508,13 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
var indexesToAdd []int
if isIndexedJob(job) {
indexesToAdd = firstPendingIndexes(jobCtx, int(diff), int(*job.Spec.Completions))
if hasBackoffLimitPerIndex(job) {
indexesToAdd, remainingTime = jm.getPodCreationInfoForIndependentIndexes(logger, indexesToAdd, jobCtx.podsWithDelayedDeletionPerIndex)
if remainingTime > 0 {
jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
return 0, metrics.JobSyncActionPodsCreated, nil
}
}
diff = int32(len(indexesToAdd))
}
@ -1502,6 +1561,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
}
template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex)
generateName = podGenerateNameWithIndex(job.Name, completionIndex)
if hasBackoffLimitPerIndex(job) {
addIndexFailureCountAnnotation(logger, template, job, jobCtx.podsWithDelayedDeletionPerIndex[completionIndex])
}
}
defer wait.Done()
err := jm.podControl.CreatePodsWithGenerateName(ctx, job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName)
@ -1544,6 +1606,26 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
return active, metrics.JobSyncActionTracking, nil
}
// getPodCreationInfoForIndependentIndexes returns a sub-list of all indexes
// to create that contains those which can be already created. In case no indexes
// are ready to create pods, it returns the lowest remaining time to create pods
// out of all indexes.
func (jm *Controller) getPodCreationInfoForIndependentIndexes(logger klog.Logger, indexesToAdd []int, podsWithDelayedDeletionPerIndex map[int]*v1.Pod) ([]int, time.Duration) {
var indexesToAddNow []int
var minRemainingTimePerIndex *time.Duration
for _, indexToAdd := range indexesToAdd {
if remainingTimePerIndex := getRemainingTimePerIndex(logger, jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff, podsWithDelayedDeletionPerIndex[indexToAdd]); remainingTimePerIndex == 0 {
indexesToAddNow = append(indexesToAddNow, indexToAdd)
} else if minRemainingTimePerIndex == nil || remainingTimePerIndex < *minRemainingTimePerIndex {
minRemainingTimePerIndex = &remainingTimePerIndex
}
}
if len(indexesToAddNow) > 0 {
return indexesToAddNow, 0
}
return indexesToAddNow, pointer.DurationDeref(minRemainingTimePerIndex, 0)
}
// activePodsForRemoval returns Pods that should be removed because there
// are too many pods running or, if this is an indexed job, there are repeated
// indexes or invalid indexes or some pods don't have indexes.
@ -1735,7 +1817,7 @@ func recordJobPodFinished(logger klog.Logger, job *batch.Job, oldCounters batch.
// now out of range (i.e. index >= spec.Completions).
if isIndexedJob(job) {
if job.Status.CompletedIndexes != oldCounters.CompletedIndexes {
diff = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)).total() - succeededIndexesFromString(logger, oldCounters.CompletedIndexes, int(*job.Spec.Completions)).total()
diff = parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)).total() - parseIndexesFromString(logger, oldCounters.CompletedIndexes, int(*job.Spec.Completions)).total()
}
} else {
diff = int(job.Status.Succeeded) - int(oldCounters.Succeeded)

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sort"
"strconv"
"testing"
@ -1128,6 +1129,9 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
wantStatusUpdates []batch.JobStatus
wantSucceededPodsMetric int
wantFailedPodsMetric int
// features
enableJobBackoffLimitPerIndex bool
}{
"no updates": {},
"new active": {
@ -1649,9 +1653,91 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
},
wantFailedPodsMetric: 2,
},
"indexed job with a failed pod with delayed finalizer removal; the pod is not counted": {
enableJobBackoffLimitPerIndex: true,
job: batch.Job{
Spec: batch.JobSpec{
CompletionMode: &indexedCompletion,
Completions: pointer.Int32(6),
BackoffLimitPerIndex: pointer.Int32(1),
},
},
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().index("1").Pod,
},
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
FailedIndexes: pointer.String(""),
},
},
},
"indexed job with a failed pod which is recreated by a running pod; the pod is counted": {
enableJobBackoffLimitPerIndex: true,
job: batch.Job{
Spec: batch.JobSpec{
CompletionMode: &indexedCompletion,
Completions: pointer.Int32(6),
BackoffLimitPerIndex: pointer.Int32(1),
},
Status: batch.JobStatus{
Active: 1,
},
},
pods: []*v1.Pod{
buildPod().uid("a1").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().index("1").Pod,
buildPod().uid("a2").phase(v1.PodRunning).indexFailureCount("1").trackingFinalizer().index("1").Pod,
},
wantRmFinalizers: 1,
wantStatusUpdates: []batch.JobStatus{
{
Active: 1,
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Failed: []types.UID{"a1"},
},
FailedIndexes: pointer.String(""),
},
{
Active: 1,
Failed: 1,
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
FailedIndexes: pointer.String(""),
},
},
wantFailedPodsMetric: 1,
},
"indexed job with a failed pod for a failed index; the pod is counted": {
enableJobBackoffLimitPerIndex: true,
job: batch.Job{
Spec: batch.JobSpec{
CompletionMode: &indexedCompletion,
Completions: pointer.Int32(6),
BackoffLimitPerIndex: pointer.Int32(1),
},
},
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().index("1").Pod,
},
wantRmFinalizers: 1,
wantStatusUpdates: []batch.JobStatus{
{
FailedIndexes: pointer.String("1"),
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Failed: []types.UID{"a"},
},
},
{
Failed: 1,
FailedIndexes: pointer.String("1"),
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
},
},
wantFailedPodsMetric: 1,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)()
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, _ := newControllerFromClient(ctx, clientSet, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{Err: tc.podControlErr}
@ -1666,20 +1752,22 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
if job.Status.UncountedTerminatedPods == nil {
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
}
uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
var succeededIndexes orderedIntervals
if isIndexedJob(job) {
succeededIndexes = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
}
jobCtx := &syncJobCtx{
job: job,
pods: tc.pods,
succeededIndexes: succeededIndexes,
uncounted: uncounted,
uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
expectedRmFinalizers: tc.expectedRmFinalizers,
finishedCondition: tc.finishedCond,
newBackoffRecord: backoffRecord{},
}
if isIndexedJob(job) {
jobCtx.succeededIndexes = parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
if tc.enableJobBackoffLimitPerIndex && job.Spec.BackoffLimitPerIndex != nil {
jobCtx.failedIndexes = calculateFailedIndexes(logger, job, tc.pods)
jobCtx.activePods = controller.FilterActivePods(logger, tc.pods)
jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx)
}
}
err := manager.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, tc.needsFlush)
if !errors.Is(err, tc.wantErr) {
t.Errorf("Got error %v, want %v", err, tc.wantErr)
@ -3123,6 +3211,484 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
}
}
func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
now := time.Now()
validObjectMeta := metav1.ObjectMeta{
Name: "foobar",
UID: uuid.NewUUID(),
Namespace: metav1.NamespaceDefault,
}
validSelector := &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
}
validTemplate := v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Image: "foo/bar"},
},
},
}
testCases := map[string]struct {
enableJobBackoffLimitPerIndex bool
enableJobPodFailurePolicy bool
job batch.Job
pods []v1.Pod
wantStatus batch.JobStatus
}{
"successful job after a single failure within index": {
enableJobBackoffLimitPerIndex: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(2),
Completions: pointer.Int32(2),
BackoffLimit: pointer.Int32(math.MaxInt32),
CompletionMode: completionModePtr(batch.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
},
},
pods: []v1.Pod{
*buildPod().uid("a1").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod,
*buildPod().uid("a2").index("0").phase(v1.PodSucceeded).indexFailureCount("1").trackingFinalizer().Pod,
*buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Failed: 1,
Succeeded: 2,
CompletedIndexes: "0,1",
FailedIndexes: pointer.String(""),
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
},
},
},
},
"single failed pod, not counted as the replacement pod creation is delayed": {
enableJobBackoffLimitPerIndex: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(2),
Completions: pointer.Int32(2),
BackoffLimit: pointer.Int32(math.MaxInt32),
CompletionMode: completionModePtr(batch.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
},
},
pods: []v1.Pod{
*buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Active: 2,
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
FailedIndexes: pointer.String(""),
},
},
"single failed pod replaced already": {
enableJobBackoffLimitPerIndex: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(2),
Completions: pointer.Int32(2),
BackoffLimit: pointer.Int32(math.MaxInt32),
CompletionMode: completionModePtr(batch.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
},
},
pods: []v1.Pod{
*buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod,
*buildPod().uid("b").index("0").phase(v1.PodPending).indexFailureCount("1").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Active: 2,
Failed: 1,
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
FailedIndexes: pointer.String(""),
},
},
"single failed index due to exceeding the backoff limit per index, the job continues": {
enableJobBackoffLimitPerIndex: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(2),
Completions: pointer.Int32(2),
BackoffLimit: pointer.Int32(math.MaxInt32),
CompletionMode: completionModePtr(batch.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
},
},
pods: []v1.Pod{
*buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Active: 1,
Failed: 1,
FailedIndexes: pointer.String("0"),
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
},
},
"single failed index due to FailIndex action, the job continues": {
enableJobBackoffLimitPerIndex: true,
enableJobPodFailurePolicy: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(2),
Completions: pointer.Int32(2),
BackoffLimit: pointer.Int32(math.MaxInt32),
CompletionMode: completionModePtr(batch.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
PodFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailIndex,
OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
Operator: batch.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{3},
},
},
},
},
},
},
pods: []v1.Pod{
*buildPod().uid("a").index("0").status(v1.PodStatus{
Phase: v1.PodFailed,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 3,
},
},
},
},
}).indexFailureCount("0").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Active: 1,
Failed: 1,
FailedIndexes: pointer.String("0"),
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
},
},
"job failed index due to FailJob action": {
enableJobBackoffLimitPerIndex: true,
enableJobPodFailurePolicy: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(2),
Completions: pointer.Int32(2),
BackoffLimit: pointer.Int32(6),
CompletionMode: completionModePtr(batch.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
PodFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailJob,
OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
Operator: batch.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{3},
},
},
},
},
},
},
pods: []v1.Pod{
*buildPod().uid("a").index("0").status(v1.PodStatus{
Phase: v1.PodFailed,
ContainerStatuses: []v1.ContainerStatus{
{
Name: "x",
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 3,
},
},
},
},
}).indexFailureCount("0").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Active: 0,
Failed: 1,
FailedIndexes: pointer.String(""),
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
{
Type: batch.JobFailureTarget,
Status: v1.ConditionTrue,
Reason: "PodFailurePolicy",
Message: "Container x for pod default/mypod-0 failed with exit code 3 matching FailJob rule at index 0",
},
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
Reason: "PodFailurePolicy",
Message: "Container x for pod default/mypod-0 failed with exit code 3 matching FailJob rule at index 0",
},
},
},
},
"job pod failure ignored due to matching Ignore action": {
enableJobBackoffLimitPerIndex: true,
enableJobPodFailurePolicy: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(2),
Completions: pointer.Int32(2),
BackoffLimit: pointer.Int32(6),
CompletionMode: completionModePtr(batch.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
PodFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionIgnore,
OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
Operator: batch.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{3},
},
},
},
},
},
},
pods: []v1.Pod{
*buildPod().uid("a").index("0").status(v1.PodStatus{
Phase: v1.PodFailed,
ContainerStatuses: []v1.ContainerStatus{
{
Name: "x",
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 3,
},
},
},
},
}).indexFailureCount("0").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Active: 2,
Failed: 0,
FailedIndexes: pointer.String(""),
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
},
},
"job failed due to exceeding backoffLimit before backoffLimitPerIndex": {
enableJobBackoffLimitPerIndex: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(2),
Completions: pointer.Int32(2),
BackoffLimit: pointer.Int32(1),
CompletionMode: completionModePtr(batch.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
},
},
pods: []v1.Pod{
*buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod,
*buildPod().uid("b").index("1").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Failed: 2,
Succeeded: 0,
FailedIndexes: pointer.String(""),
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
Reason: "BackoffLimitExceeded",
Message: "Job has reached the specified backoff limit",
},
},
},
},
"job failed due to failed indexes": {
enableJobBackoffLimitPerIndex: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(2),
Completions: pointer.Int32(2),
BackoffLimit: pointer.Int32(math.MaxInt32),
CompletionMode: completionModePtr(batch.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
},
},
pods: []v1.Pod{
*buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
*buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Failed: 1,
Succeeded: 1,
FailedIndexes: pointer.String("0"),
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
Reason: "FailedIndexes",
Message: "Job has failed indexes",
},
},
},
},
"job failed due to exceeding max failed indexes": {
enableJobBackoffLimitPerIndex: true,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(4),
Completions: pointer.Int32(4),
BackoffLimit: pointer.Int32(math.MaxInt32),
CompletionMode: completionModePtr(batch.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
MaxFailedIndexes: pointer.Int32(1),
},
},
pods: []v1.Pod{
*buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
*buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod,
*buildPod().uid("c").index("2").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
*buildPod().uid("d").index("3").phase(v1.PodRunning).indexFailureCount("0").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Failed: 3,
Succeeded: 1,
FailedIndexes: pointer.String("0,2"),
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
Reason: "MaxFailedIndexesExceeded",
Message: "Job has exceeded the specified maximal number of failed indexes",
},
},
},
},
"job with finished indexes; failedIndexes are cleaned when JobBackoffLimitPerIndex disabled": {
enableJobBackoffLimitPerIndex: false,
job: batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: validObjectMeta,
Spec: batch.JobSpec{
Selector: validSelector,
Template: validTemplate,
Parallelism: pointer.Int32(3),
Completions: pointer.Int32(3),
BackoffLimit: pointer.Int32(math.MaxInt32),
CompletionMode: completionModePtr(batch.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
},
Status: batch.JobStatus{
FailedIndexes: pointer.String("0"),
CompletedIndexes: "1",
},
},
pods: []v1.Pod{
*buildPod().uid("c").index("2").phase(v1.PodPending).indexFailureCount("1").trackingFinalizer().Pod,
},
wantStatus: batch.JobStatus{
Active: 2,
Succeeded: 1,
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
fakeClock := clocktesting.NewFakeClock(now)
manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
job := &tc.job
actual := job
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job
return job, nil
}
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
for i, pod := range tc.pods {
pod := pod
pb := podBuilder{Pod: &pod}.name(fmt.Sprintf("mypod-%d", i)).job(job)
if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion {
pb.index(fmt.Sprintf("%v", getCompletionIndex(pod.Annotations)))
}
pb = pb.trackingFinalizer()
sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod)
}
manager.syncJob(context.TODO(), testutil.GetKey(job, t))
// validate relevant fields of the status
if diff := cmp.Diff(tc.wantStatus, actual.Status,
cmpopts.IgnoreFields(batch.JobStatus{}, "StartTime", "CompletionTime", "Ready"),
cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" {
t.Errorf("unexpected job status. Diff: %s\n", diff)
}
})
}
}
func TestSyncJobUpdateRequeue(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
@ -3217,6 +3783,69 @@ func TestUpdateJobRequeue(t *testing.T) {
}
}
func TestGetPodCreationInfoForIndependentIndexes(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
now := time.Now()
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
cases := map[string]struct {
indexesToAdd []int
podsWithDelayedDeletionPerIndex map[int]*v1.Pod
wantIndexesToAdd []int
wantRemainingTime time.Duration
}{
"simple index creation": {
indexesToAdd: []int{1, 3},
wantIndexesToAdd: []int{1, 3},
},
"subset of indexes can be recreated now": {
indexesToAdd: []int{1, 3},
podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{
1: buildPod().indexFailureCount("0").index("1").customDeletionTimestamp(now).Pod,
},
wantIndexesToAdd: []int{3},
},
"subset of indexes can be recreated now as the pods failed long time ago": {
indexesToAdd: []int{1, 3},
podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{
1: buildPod().indexFailureCount("0").customDeletionTimestamp(now).Pod,
3: buildPod().indexFailureCount("0").customDeletionTimestamp(now.Add(-DefaultJobPodFailureBackOff)).Pod,
},
wantIndexesToAdd: []int{3},
},
"no indexes can be recreated now, need to wait default pod failure backoff": {
indexesToAdd: []int{1, 2, 3},
podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{
1: buildPod().indexFailureCount("1").customDeletionTimestamp(now).Pod,
2: buildPod().indexFailureCount("0").customDeletionTimestamp(now).Pod,
3: buildPod().indexFailureCount("2").customDeletionTimestamp(now).Pod,
},
wantRemainingTime: DefaultJobPodFailureBackOff,
},
"no indexes can be recreated now, need to wait but 1s already passed": {
indexesToAdd: []int{1, 2, 3},
podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{
1: buildPod().indexFailureCount("1").customDeletionTimestamp(now.Add(-time.Second)).Pod,
2: buildPod().indexFailureCount("0").customDeletionTimestamp(now.Add(-time.Second)).Pod,
3: buildPod().indexFailureCount("2").customDeletionTimestamp(now.Add(-time.Second)).Pod,
},
wantRemainingTime: DefaultJobPodFailureBackOff - time.Second,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
fakeClock := clocktesting.NewFakeClock(now)
manager, _ := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock)
gotIndexesToAdd, gotRemainingTime := manager.getPodCreationInfoForIndependentIndexes(logger, tc.indexesToAdd, tc.podsWithDelayedDeletionPerIndex)
if diff := cmp.Diff(tc.wantIndexesToAdd, gotIndexesToAdd); diff != "" {
t.Fatalf("Unexpected indexes to add: %s", diff)
}
if diff := cmp.Diff(tc.wantRemainingTime, gotRemainingTime); diff != "" {
t.Fatalf("Unexpected remaining time: %s", diff)
}
})
}
}
func TestJobPodLookup(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
@ -4541,10 +5170,27 @@ func (pb podBuilder) clearLabels() podBuilder {
}
func (pb podBuilder) index(ix string) podBuilder {
return pb.annotation(batch.JobCompletionIndexAnnotation, ix)
}
func (pb podBuilder) indexFailureCount(count string) podBuilder {
return pb.annotation(batch.JobIndexFailureCountAnnotation, count)
}
func (pb podBuilder) indexIgnoredFailureCount(count string) podBuilder {
return pb.annotation(batch.JobIndexIgnoredFailureCountAnnotation, count)
}
func (pb podBuilder) annotation(key, value string) podBuilder {
if pb.Annotations == nil {
pb.Annotations = make(map[string]string)
}
pb.Annotations[batch.JobCompletionIndexAnnotation] = ix
pb.Annotations[key] = value
return pb
}
func (pb podBuilder) status(s v1.PodStatus) podBuilder {
pb.Status = s
return pb
}
@ -4568,6 +5214,15 @@ func (pb podBuilder) deletionTimestamp() podBuilder {
return pb
}
func (pb podBuilder) customDeletionTimestamp(t time.Time) podBuilder {
pb.DeletionTimestamp = &metav1.Time{Time: t}
return pb
}
func completionModePtr(m batch.CompletionMode) *batch.CompletionMode {
return &m
}
func setDurationDuringTest(val *time.Duration, newVal time.Duration) func() {
origVal := *val
*val = newVal

View File

@ -21,20 +21,24 @@ import (
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)
// matchPodFailurePolicy returns information about matching a given failed pod
// against the pod failure policy rules. The information is represented as an
// optional job failure message (present in case the pod matched a 'FailJob'
// rule), a boolean indicating if the failure should be counted towards
// backoffLimit (it should not be counted if the pod matched an 'Ignore' rule),
// and a pointer to the matched pod failure policy action.
// - optional job failure message (present in case the pod matched a 'FailJob' rule),
// - a boolean indicating if the failure should be counted towards backoffLimit
// (and backoffLimitPerIndex if specified). It should not be counted
// if the pod matched an 'Ignore' rule,
// - a pointer to the matched pod failure policy action.
func matchPodFailurePolicy(podFailurePolicy *batch.PodFailurePolicy, failedPod *v1.Pod) (*string, bool, *batch.PodFailurePolicyAction) {
if podFailurePolicy == nil {
return nil, true, nil
}
ignore := batch.PodFailurePolicyActionIgnore
failJob := batch.PodFailurePolicyActionFailJob
failIndex := batch.PodFailurePolicyActionFailIndex
count := batch.PodFailurePolicyActionCount
for index, podFailurePolicyRule := range podFailurePolicy.Rules {
if podFailurePolicyRule.OnExitCodes != nil {
@ -42,6 +46,10 @@ func matchPodFailurePolicy(podFailurePolicy *batch.PodFailurePolicy, failedPod *
switch podFailurePolicyRule.Action {
case batch.PodFailurePolicyActionIgnore:
return nil, false, &ignore
case batch.PodFailurePolicyActionFailIndex:
if feature.DefaultFeatureGate.Enabled(features.JobBackoffLimitPerIndex) {
return nil, true, &failIndex
}
case batch.PodFailurePolicyActionCount:
return nil, true, &count
case batch.PodFailurePolicyActionFailJob:
@ -55,6 +63,10 @@ func matchPodFailurePolicy(podFailurePolicy *batch.PodFailurePolicy, failedPod *
switch podFailurePolicyRule.Action {
case batch.PodFailurePolicyActionIgnore:
return nil, false, &ignore
case batch.PodFailurePolicyActionFailIndex:
if feature.DefaultFeatureGate.Enabled(features.JobBackoffLimitPerIndex) {
return nil, true, &failIndex
}
case batch.PodFailurePolicyActionCount:
return nil, true, &count
case batch.PodFailurePolicyActionFailJob:

View File

@ -23,7 +23,10 @@ import (
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/pointer"
)
@ -34,14 +37,16 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}
ignore := batch.PodFailurePolicyActionIgnore
failJob := batch.PodFailurePolicyActionFailJob
failIndex := batch.PodFailurePolicyActionFailIndex
count := batch.PodFailurePolicyActionCount
testCases := map[string]struct {
podFailurePolicy *batch.PodFailurePolicy
failedPod *v1.Pod
wantJobFailureMessage *string
wantCountFailed bool
wantAction *batch.PodFailurePolicyAction
enableJobBackoffLimitPerIndex bool
podFailurePolicy *batch.PodFailurePolicy
failedPod *v1.Pod
wantJobFailureMessage *string
wantCountFailed bool
wantAction *batch.PodFailurePolicyAction
}{
"unknown action for rule matching by exit codes - skip rule with unknown action": {
podFailurePolicy: &batch.PodFailurePolicy{
@ -292,6 +297,68 @@ func TestMatchPodFailurePolicy(t *testing.T) {
wantJobFailureMessage: nil,
wantCountFailed: true,
},
"FailIndex rule matched for exit codes; JobBackoffLimitPerIndex enabled": {
enableJobBackoffLimitPerIndex: true,
podFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailIndex,
OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
Operator: batch.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{1, 2, 3},
},
},
},
},
failedPod: &v1.Pod{
ObjectMeta: validPodObjectMeta,
Status: v1.PodStatus{
Phase: v1.PodFailed,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 2,
},
},
},
},
},
},
wantCountFailed: true,
wantAction: &failIndex,
},
"FailIndex rule matched for exit codes; JobBackoffLimitPerIndex disabled": {
enableJobBackoffLimitPerIndex: false,
podFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailIndex,
OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
Operator: batch.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{1, 2, 3},
},
},
},
},
failedPod: &v1.Pod{
ObjectMeta: validPodObjectMeta,
Status: v1.PodStatus{
Phase: v1.PodFailed,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 2,
},
},
},
},
},
},
wantCountFailed: true,
wantAction: nil,
},
"pod failure policy with NotIn operator and value 0": {
podFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
@ -406,6 +473,66 @@ func TestMatchPodFailurePolicy(t *testing.T) {
wantCountFailed: true,
wantAction: &count,
},
"FailIndex rule matched for pod conditions; JobBackoffLimitPerIndex enabled": {
enableJobBackoffLimitPerIndex: true,
podFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailIndex,
OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
},
},
failedPod: &v1.Pod{
ObjectMeta: validPodObjectMeta,
Status: v1.PodStatus{
Phase: v1.PodFailed,
Conditions: []v1.PodCondition{
{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
},
wantCountFailed: true,
wantAction: &failIndex,
},
"FailIndex rule matched for pod conditions; JobBackoffLimitPerIndex disabled": {
enableJobBackoffLimitPerIndex: false,
podFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
{
Action: batch.PodFailurePolicyActionFailIndex,
OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
},
},
failedPod: &v1.Pod{
ObjectMeta: validPodObjectMeta,
Status: v1.PodStatus{
Phase: v1.PodFailed,
Conditions: []v1.PodCondition{
{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
},
wantCountFailed: true,
wantAction: nil,
},
"ignore rule matched for pod conditions": {
podFailurePolicy: &batch.PodFailurePolicy{
Rules: []batch.PodFailurePolicyRule{
@ -709,6 +836,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)()
jobFailMessage, countFailed, action := matchPodFailurePolicy(tc.podFailurePolicy, tc.failedPod)
if diff := cmp.Diff(tc.wantJobFailureMessage, jobFailMessage); diff != "" {
t.Errorf("Unexpected job failure message: %s", diff)

View File

@ -682,6 +682,633 @@ func TestJobPodFailurePolicy(t *testing.T) {
}
}
// TestBackoffLimitPerIndex_DelayedPodDeletion tests the pod deletion is delayed
// until the replacement pod is created, so that the replacement pod has the
// index-failure-count annotation bumped, when BackoffLimitPerIndex is used.
func TestBackoffLimitPerIndex_DelayedPodDeletion(t *testing.T) {
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-failed")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() {
cancel()
}()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32(1),
Completions: pointer.Int32(1),
BackoffLimitPerIndex: pointer.Int32(1),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0), "", pointer.String(""))
// First pod from index 0 failed.
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil {
t.Fatal("Failed trying to fail pod with index 0")
}
// Delete the failed pod
pod, err := getJobPodForIndex(ctx, clientSet, jobObj, 0, func(_ *v1.Pod) bool { return true })
if err != nil {
t.Fatalf("failed to get terminal pod for index: %v", 0)
}
if err := clientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("failed to delete pod: %v, error: %v", klog.KObj(pod), err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Failed: 1,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0), "", pointer.String(""))
// Verify the replacement pod is created and has the index-failure-count
// annotation bumped.
replacement, err := getActivePodForIndex(ctx, clientSet, jobObj, 0)
if err != nil {
t.Fatalf("Failed to get active replacement pod for index: %v, error: %v", 0, err)
}
gotIndexFailureCount, err := getIndexFailureCount(replacement)
if err != nil {
t.Fatalf("Failed read the index failure count annotation for pod: %v, error: %v", klog.KObj(replacement), err)
}
if diff := cmp.Diff(1, gotIndexFailureCount); diff != "" {
t.Errorf("Unexpected index failure count for the replacement pod: %s", diff)
}
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 0); err != nil {
t.Fatal("Failed trying to fail pod with index 0")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 0,
Succeeded: 1,
Failed: 1,
Ready: pointer.Int32(0),
})
validateJobSucceeded(ctx, t, clientSet, jobObj)
}
// TestBackoffLimitPerIndex_Reenabling tests handling of pod failures when
// reenabling the BackoffLimitPerIndex feature.
func TestBackoffLimitPerIndex_Reenabling(t *testing.T) {
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-reenabled")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel()
resetMetrics()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32(3),
Completions: pointer.Int32(3),
BackoffLimitPerIndex: pointer.Int32(0),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", pointer.String(""))
// First pod from index 0 failed
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil {
t.Fatal("Failed trying to fail pod with index 0")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Failed: 1,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1, 2), "", pointer.String("0"))
// Disable the feature
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, false)()
// First pod from index 1 failed
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatal("Failed trying to fail pod with index 1")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Failed: 2,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", nil)
// Reenable the feature
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
// First pod from index 2 failed
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatal("Failed trying to fail pod with index 2")
}
// Verify the indexes 0 and 1 are active as the failed pods don't have
// finalizers at this point, so they are ignored.
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Failed: 3,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", pointer.String("2"))
// mark remaining pods are Succeeded and verify Job status
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
}
validateJobFailed(ctx, t, clientSet, jobObj)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
}
// TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff tests that the
// pods are recreated with expotential backoff delay computed independently
// per index. Scenario:
// - fail index 0
// - fail index 0
// - fail index 1
// - succeed index 0
// - fail index 1
// - succeed index 1
func TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, 2*time.Second))
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(2),
Parallelism: pointer.Int32(2),
BackoffLimitPerIndex: pointer.Int32(2),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
},
})
if err != nil {
t.Fatalf("Could not create job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", pointer.String(""))
// Fail the first pod for index 0
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Failed: 1,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", pointer.String(""))
// Fail the second pod for index 0
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Failed: 2,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", pointer.String(""))
// Fail the first pod for index 1
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Failed: 3,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", pointer.String(""))
// Succeed the third pod for index 0
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 0); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Failed: 3,
Succeeded: 1,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1), "0", pointer.String(""))
// Fail the second pod for index 1
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Failed: 4,
Succeeded: 1,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1), "0", pointer.String(""))
// Succeed the third pod for index 1
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 0,
Failed: 4,
Succeeded: 2,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New[int](), "0,1", pointer.String(""))
validateJobSucceeded(ctx, t, clientSet, jobObj)
for index := 0; index < int(*jobObj.Spec.Completions); index++ {
podsForIndex, err := getJobPodsForIndex(ctx, clientSet, jobObj, index, func(_ *v1.Pod) bool { return true })
if err != nil {
t.Fatalf("Failed to list job %q pods for index %v, error: %v", klog.KObj(jobObj), index, err)
}
validateExpotentialBackoffDelay(t, jobcontroller.DefaultJobPodFailureBackOff, podsForIndex)
}
}
// TestBackoffLimitPerIndex tests handling of job and its pods when
// backoff limit per index is used.
func TestBackoffLimitPerIndex(t *testing.T) {
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
type podTerminationWithExpectations struct {
index int
status v1.PodStatus
wantActive int
wantFailed int
wantSucceeded int
wantActiveIndexes sets.Set[int]
wantCompletedIndexes string
wantFailedIndexes *string
wantReplacementPodFailureCount *int
}
podTemplateSpec := v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "main-container",
Image: "foo",
ImagePullPolicy: v1.PullIfNotPresent,
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
},
},
},
}
testCases := map[string]struct {
job batchv1.Job
podTerminations []podTerminationWithExpectations
wantJobConditionType batchv1.JobConditionType
}{
"job succeeded": {
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32(2),
Completions: pointer.Int32(2),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
Template: podTemplateSpec,
},
},
podTerminations: []podTerminationWithExpectations{
{
status: v1.PodStatus{
Phase: v1.PodFailed,
},
wantActive: 2,
wantFailed: 1,
wantActiveIndexes: sets.New(0, 1),
wantFailedIndexes: pointer.String(""),
wantReplacementPodFailureCount: pointer.Int(1),
},
},
wantJobConditionType: batchv1.JobComplete,
},
"job index fails due to exceeding backoff limit per index": {
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32(2),
Completions: pointer.Int32(2),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(2),
Template: podTemplateSpec,
},
},
podTerminations: []podTerminationWithExpectations{
{
status: v1.PodStatus{
Phase: v1.PodFailed,
},
wantActive: 2,
wantFailed: 1,
wantActiveIndexes: sets.New(0, 1),
wantFailedIndexes: pointer.String(""),
wantReplacementPodFailureCount: pointer.Int(1),
},
{
status: v1.PodStatus{
Phase: v1.PodFailed,
},
wantActive: 2,
wantFailed: 2,
wantActiveIndexes: sets.New(0, 1),
wantFailedIndexes: pointer.String(""),
wantReplacementPodFailureCount: pointer.Int(2),
},
{
status: v1.PodStatus{
Phase: v1.PodFailed,
},
wantActive: 1,
wantFailed: 3,
wantActiveIndexes: sets.New(1),
wantFailedIndexes: pointer.String("0"),
},
},
wantJobConditionType: batchv1.JobFailed,
},
"job index fails due to exceeding the global backoff limit first": {
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32(3),
Completions: pointer.Int32(3),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
BackoffLimit: pointer.Int32(2),
Template: podTemplateSpec,
},
},
podTerminations: []podTerminationWithExpectations{
{
index: 0,
status: v1.PodStatus{
Phase: v1.PodFailed,
},
wantActive: 3,
wantFailed: 1,
wantActiveIndexes: sets.New(0, 1, 2),
wantFailedIndexes: pointer.String(""),
},
{
index: 1,
status: v1.PodStatus{
Phase: v1.PodFailed,
},
wantActive: 3,
wantFailed: 2,
wantActiveIndexes: sets.New(0, 1, 2),
wantFailedIndexes: pointer.String(""),
},
{
index: 2,
status: v1.PodStatus{
Phase: v1.PodFailed,
},
wantFailed: 5,
wantFailedIndexes: pointer.String(""),
},
},
wantJobConditionType: batchv1.JobFailed,
},
"job continues execution after a failed index, the job is marked Failed due to the failed index": {
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32(2),
Completions: pointer.Int32(2),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(0),
Template: podTemplateSpec,
},
},
podTerminations: []podTerminationWithExpectations{
{
index: 0,
status: v1.PodStatus{
Phase: v1.PodFailed,
},
wantActive: 1,
wantFailed: 1,
wantActiveIndexes: sets.New(1),
wantFailedIndexes: pointer.String("0"),
},
{
index: 1,
status: v1.PodStatus{
Phase: v1.PodSucceeded,
},
wantFailed: 1,
wantSucceeded: 1,
wantFailedIndexes: pointer.String("0"),
wantCompletedIndexes: "1",
},
},
wantJobConditionType: batchv1.JobFailed,
},
"job execution terminated early due to exceeding max failed indexes": {
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32(3),
Completions: pointer.Int32(3),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(0),
MaxFailedIndexes: pointer.Int32(1),
Template: podTemplateSpec,
},
},
podTerminations: []podTerminationWithExpectations{
{
index: 0,
status: v1.PodStatus{
Phase: v1.PodFailed,
},
wantActive: 2,
wantFailed: 1,
wantActiveIndexes: sets.New(1, 2),
wantFailedIndexes: pointer.String("0"),
},
{
index: 1,
status: v1.PodStatus{
Phase: v1.PodFailed,
},
wantActive: 0,
wantFailed: 3,
wantFailedIndexes: pointer.String("0,1"),
},
},
wantJobConditionType: batchv1.JobFailed,
},
"pod failure matching pod failure policy rule with FailIndex action": {
job: batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32(2),
Completions: pointer.Int32(2),
CompletionMode: completionModePtr(batchv1.IndexedCompletion),
BackoffLimitPerIndex: pointer.Int32(1),
Template: podTemplateSpec,
PodFailurePolicy: &batchv1.PodFailurePolicy{
Rules: []batchv1.PodFailurePolicyRule{
{
Action: batchv1.PodFailurePolicyActionFailIndex,
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{13},
},
},
{
Action: batchv1.PodFailurePolicyActionFailIndex,
OnPodConditions: []batchv1.PodFailurePolicyOnPodConditionsPattern{
{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
},
},
},
},
podTerminations: []podTerminationWithExpectations{
{
index: 0,
status: v1.PodStatus{
Phase: v1.PodFailed,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 13,
},
},
},
},
},
wantActive: 1,
wantFailed: 1,
wantActiveIndexes: sets.New(1),
wantFailedIndexes: pointer.String("0"),
},
{
index: 1,
status: v1.PodStatus{
Phase: v1.PodFailed,
Conditions: []v1.PodCondition{
{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
},
},
},
wantFailed: 2,
wantFailedIndexes: pointer.String("0,1"),
},
},
wantJobConditionType: batchv1.JobFailed,
},
}
for name, test := range testCases {
t.Run(name, func(t *testing.T) {
resetMetrics()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() {
cancel()
}()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
if err != nil {
t.Fatalf("Error %q while creating the job %q", err, jobObj.Name)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: int(*test.job.Spec.Parallelism),
Ready: pointer.Int32(0),
})
for _, podTermination := range test.podTerminations {
pod, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index)
if err != nil {
t.Fatalf("listing Job Pods: %q", err)
}
pod.Status = podTermination.status
if _, err = clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Error updating the pod %q: %q", klog.KObj(pod), err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: podTermination.wantActive,
Succeeded: podTermination.wantSucceeded,
Failed: podTermination.wantFailed,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes)
if podTermination.wantReplacementPodFailureCount != nil {
replacement, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index)
if err != nil {
t.Fatalf("Failed to get active replacement pod for index: %v, error: %v", podTermination.index, err)
}
gotReplacementPodFailureCount, err := getIndexFailureCount(replacement)
if err != nil {
t.Fatalf("Failed read the index failure count annotation for pod: %v, error: %v", klog.KObj(replacement), err)
}
if *podTermination.wantReplacementPodFailureCount != gotReplacementPodFailureCount {
t.Fatalf("Unexpected value of the index failure count annotation. Want: %v, got: %v", *podTermination.wantReplacementPodFailureCount, gotReplacementPodFailureCount)
}
}
}
remainingActive := test.podTerminations[len(test.podTerminations)-1].wantActive
if remainingActive > 0 {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remainingActive); err != nil {
t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
}
}
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
})
}
}
func getIndexFailureCount(p *v1.Pod) (int, error) {
if p.Annotations == nil {
return 0, errors.New("no annotations found")
}
v, ok := p.Annotations[batchv1.JobIndexFailureCountAnnotation]
if !ok {
return 0, fmt.Errorf("annotation %s not found", batchv1.JobIndexFailureCountAnnotation)
}
return strconv.Atoi(v)
}
func completionModePtr(cm batchv1.CompletionMode) *batchv1.CompletionMode {
return &cm
}
// TestNonParallelJob tests that a Job that only executes one Pod. The test
// recreates the Job controller at some points to make sure a new controller
// is able to pickup.
@ -999,7 +1626,7 @@ func TestIndexedJob(t *testing.T) {
Active: 3,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "")
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", nil)
// One Pod succeeds.
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
@ -1010,7 +1637,7 @@ func TestIndexedJob(t *testing.T) {
Succeeded: 1,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1")
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1", nil)
// One Pod fails, which should be recreated.
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
@ -1022,7 +1649,7 @@ func TestIndexedJob(t *testing.T) {
Succeeded: 1,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1")
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1", nil)
// Remaining Pods succeed.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
@ -1034,7 +1661,7 @@ func TestIndexedJob(t *testing.T) {
Succeeded: 4,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3")
validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3", nil)
validateJobSucceeded(ctx, t, clientSet, jobObj)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
validateTerminatedPodsTrackingFinalizerMetric(t, 5)
@ -1208,7 +1835,7 @@ func TestElasticIndexedJob(t *testing.T) {
Failed: update.wantFailed,
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes)
validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes, nil)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
@ -1424,10 +2051,14 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
if len(jobPods) != 3 {
t.Fatalf("Expected to get %v pods, received %v", 4, len(jobPods))
}
validateExpotentialBackoffDelay(t, jobcontroller.DefaultJobPodFailureBackOff, jobPods)
}
func validateExpotentialBackoffDelay(t *testing.T, defaultPodFailureBackoff time.Duration, pods []*v1.Pod) {
t.Helper()
creationTime := []time.Time{}
finishTime := []time.Time{}
for _, pod := range jobPods {
for _, pod := range pods {
creationTime = append(creationTime, pod.CreationTimestamp.Time)
if len(pod.Status.ContainerStatuses) > 0 {
finishTime = append(finishTime, pod.Status.ContainerStatuses[0].State.Terminated.FinishedAt.Time)
@ -1441,25 +2072,24 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
return finishTime[i].Before(finishTime[j])
})
if creationTime[1].Sub(finishTime[0]).Seconds() < jobcontroller.DefaultJobPodFailureBackOff.Seconds() {
t.Fatalf("Second pod should be created at least %v seconds after the first pod", jobcontroller.DefaultJobPodFailureBackOff)
diff := creationTime[1].Sub(finishTime[0])
if diff < defaultPodFailureBackoff {
t.Fatalf("Second pod should be created at least %v seconds after the first pod, time difference: %v", defaultPodFailureBackoff, diff)
}
if creationTime[1].Sub(finishTime[0]).Seconds() >= 2*jobcontroller.DefaultJobPodFailureBackOff.Seconds() {
t.Fatalf("Second pod should be created before %v seconds after the first pod", 2*jobcontroller.DefaultJobPodFailureBackOff)
if diff >= 2*defaultPodFailureBackoff {
t.Fatalf("Second pod should be created before %v seconds after the first pod, time difference: %v", 2*defaultPodFailureBackoff, diff)
}
diff := creationTime[2].Sub(finishTime[1]).Seconds()
diff = creationTime[2].Sub(finishTime[1])
// The third pod should not be created before 4 seconds
if diff < 2*jobcontroller.DefaultJobPodFailureBackOff.Seconds() {
t.Fatalf("Third pod should be created at least %v seconds after the second pod", 2*jobcontroller.DefaultJobPodFailureBackOff)
if diff < 2*defaultPodFailureBackoff {
t.Fatalf("Third pod should be created at least %v seconds after the second pod, time difference: %v", 2*defaultPodFailureBackoff, diff)
}
// The third pod should be created within 8 seconds
// This check rules out double counting
if diff >= 4*jobcontroller.DefaultJobPodFailureBackOff.Seconds() {
t.Fatalf("Third pod should be created before %v seconds after the second pod", 4*jobcontroller.DefaultJobPodFailureBackOff)
if diff >= 4*defaultPodFailureBackoff {
t.Fatalf("Third pod should be created before %v seconds after the second pod, time difference: %v", 4*defaultPodFailureBackoff, diff)
}
}
@ -1815,7 +2445,7 @@ func validateFinishedPodsNoFinalizer(ctx context.Context, t *testing.T, clientSe
// validateIndexedJobPods validates indexes and hostname of
// active and completed Pods of an Indexed Job.
// Call after validateJobPodsStatus
func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, wantActive sets.Set[int], gotCompleted string) {
func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, wantActive sets.Set[int], gotCompleted string, wantFailed *string) {
t.Helper()
updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
@ -1824,6 +2454,9 @@ func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clients
if updatedJob.Status.CompletedIndexes != gotCompleted {
t.Errorf("Got completed indexes %q, want %q", updatedJob.Status.CompletedIndexes, gotCompleted)
}
if diff := cmp.Diff(wantFailed, updatedJob.Status.FailedIndexes); diff != "" {
t.Errorf("Got unexpected failed indexes: %s", diff)
}
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed to list Job Pods: %v", err)
@ -2005,6 +2638,17 @@ func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, job
}
if pix, err := getCompletionIndex(&pod); err == nil && pix == ix {
pod.Status.Phase = phase
if phase == v1.PodFailed || phase == v1.PodSucceeded {
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
FinishedAt: metav1.Now(),
},
},
},
}
}
_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("updating pod %s status: %w", pod.Name, err)
@ -2015,6 +2659,44 @@ func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, job
return errors.New("no pod matching index found")
}
func getActivePodForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, ix int) (*v1.Pod, error) {
return getJobPodForIndex(ctx, clientSet, jobObj, ix, func(p *v1.Pod) bool {
return !podutil.IsPodTerminal(p)
})
}
func getJobPodForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, ix int, filter func(*v1.Pod) bool) (*v1.Pod, error) {
pods, err := getJobPodsForIndex(ctx, clientSet, jobObj, ix, filter)
if err != nil {
return nil, err
}
if len(pods) == 0 {
return nil, fmt.Errorf("Pod not found for index: %v", ix)
}
return pods[0], nil
}
func getJobPodsForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, ix int, filter func(*v1.Pod) bool) ([]*v1.Pod, error) {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("listing Job Pods: %w", err)
}
var result []*v1.Pod
for _, pod := range pods.Items {
pod := pod
if !metav1.IsControlledBy(&pod, jobObj) {
continue
}
if !filter(&pod) {
continue
}
if pix, err := getCompletionIndex(&pod); err == nil && pix == ix {
result = append(result, &pod)
}
}
return result, nil
}
func getCompletionIndex(p *v1.Pod) (int, error) {
if p.Annotations == nil {
return 0, errors.New("no annotations found")