From 8812531b8caa50c656f2b4db274845ac77f99f2b Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 6 Jan 2021 16:51:44 -0500 Subject: [PATCH] Add completion index to Job Pods When .spec.completionMode="Indexed" --- pkg/controller/job/indexed_job_utils.go | 214 +++++++++++++++ pkg/controller/job/indexed_job_utils_test.go | 256 +++++++++++++++++ pkg/controller/job/job_controller.go | 214 +++++++++------ pkg/controller/job/job_controller_test.go | 272 +++++++++++++++---- 4 files changed, 816 insertions(+), 140 deletions(-) create mode 100644 pkg/controller/job/indexed_job_utils.go create mode 100644 pkg/controller/job/indexed_job_utils_test.go diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go new file mode 100644 index 00000000000..6fb11c99932 --- /dev/null +++ b/pkg/controller/job/indexed_job_utils.go @@ -0,0 +1,214 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "fmt" + "math" + "sort" + "strconv" + "strings" + + batch "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/controller" +) + +const ( + completionIndexEnvName = "JOB_COMPLETION_INDEX" + unknownCompletionIndex = -1 +) + +// calculateCompletedIndexesStr returns a string representation of the list +// of completed indexes in compressed format. +func calculateCompletedIndexesStr(pods []*v1.Pod) string { + sort.Sort(byCompletionIndex(pods)) + var result strings.Builder + var lastSucceeded int + firstSucceeded := math.MinInt32 + for _, p := range pods { + ix := getCompletionIndex(p.Annotations) + if ix == unknownCompletionIndex { + continue + } + if p.Status.Phase == v1.PodSucceeded { + if firstSucceeded == math.MinInt32 { + firstSucceeded = ix + } else if ix > lastSucceeded+1 { + addSingleOrRangeStr(&result, firstSucceeded, lastSucceeded) + firstSucceeded = ix + } + lastSucceeded = ix + } + } + if firstSucceeded != math.MinInt32 { + addSingleOrRangeStr(&result, firstSucceeded, lastSucceeded) + } + return result.String() +} + +func addSingleOrRangeStr(builder *strings.Builder, first, last int) { + if builder.Len() > 0 { + builder.WriteRune(',') + } + builder.WriteString(strconv.Itoa(first)) + if last > first { + if last == first+1 { + builder.WriteRune(',') + } else { + builder.WriteRune('-') + } + builder.WriteString(strconv.Itoa(last)) + } +} + +// firstPendingIndexes returns `count` indexes less than `completions` that are +// not covered by running or succeeded pods. +func firstPendingIndexes(pods []*v1.Pod, count, completions int) []int { + if count == 0 { + return nil + } + nonPending := sets.NewInt() + for _, p := range pods { + if p.Status.Phase == v1.PodSucceeded || controller.IsPodActive(p) { + ix := getCompletionIndex(p.Annotations) + if ix != unknownCompletionIndex { + nonPending.Insert(ix) + } + } + } + result := make([]int, 0, count) + // The following algorithm is bounded by the number of non pending pods and + // parallelism. + // TODO(#99368): Convert the list of non-pending pods into a set of + // non-pending intervals from the job's .status.completedIndexes and active + // pods. + candidate := 0 + for _, np := range nonPending.List() { + for ; candidate < np && candidate < completions; candidate++ { + result = append(result, candidate) + if len(result) == count { + return result + } + } + candidate = np + 1 + } + for ; candidate < completions && len(result) < count; candidate++ { + result = append(result, candidate) + } + return result +} + +// appendDuplicatedIndexPodsForRemoval scans active `pods` for duplicated +// completion indexes. For each index, it selects n-1 pods for removal, where n +// is the number of repetitions. The pods to be removed are appended to `rm`, +// while the remaining pods are appended to `left`. +// All pods that don't have a completion index are appended to `rm`. +func appendDuplicatedIndexPodsForRemoval(rm, left, pods []*v1.Pod) ([]*v1.Pod, []*v1.Pod) { + sort.Sort(byCompletionIndex(pods)) + lastIndex := unknownCompletionIndex + firstRepeatPos := 0 + for i, p := range pods { + ix := getCompletionIndex(p.Annotations) + if ix != lastIndex { + rm, left = appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:i], lastIndex) + firstRepeatPos = i + lastIndex = ix + } + } + return appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:], lastIndex) +} + +func appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods []*v1.Pod, ix int) ([]*v1.Pod, []*v1.Pod) { + if ix == unknownCompletionIndex { + rm = append(rm, pods...) + return rm, left + } + if len(pods) == 1 { + left = append(left, pods[0]) + return rm, left + } + sort.Sort(controller.ActivePods(pods)) + rm = append(rm, pods[:len(pods)-1]...) + left = append(left, pods[len(pods)-1]) + return rm, left +} + +func getCompletionIndex(annotations map[string]string) int { + if annotations == nil { + return unknownCompletionIndex + } + v, ok := annotations[batch.JobCompletionIndexAnnotationAlpha] + if !ok { + return unknownCompletionIndex + } + i, err := strconv.Atoi(v) + if err != nil { + return unknownCompletionIndex + } + if i < 0 { + return unknownCompletionIndex + } + return i +} + +func addCompletionIndexEnvVariables(template *v1.PodTemplateSpec) { + for i := range template.Spec.InitContainers { + addCompletionIndexEnvVariable(&template.Spec.InitContainers[i]) + } + for i := range template.Spec.Containers { + addCompletionIndexEnvVariable(&template.Spec.Containers[i]) + } +} + +func addCompletionIndexEnvVariable(container *v1.Container) { + for _, v := range container.Env { + if v.Name == completionIndexEnvName { + return + } + } + container.Env = append(container.Env, v1.EnvVar{ + Name: completionIndexEnvName, + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotationAlpha), + }, + }, + }) +} + +func addCompletionIndexAnnotation(template *v1.PodTemplateSpec, index int) { + if template.Annotations == nil { + template.Annotations = make(map[string]string, 1) + } + template.Annotations[batch.JobCompletionIndexAnnotationAlpha] = strconv.Itoa(index) +} + +type byCompletionIndex []*v1.Pod + +func (bci byCompletionIndex) Less(i, j int) bool { + return getCompletionIndex(bci[i].Annotations) < getCompletionIndex(bci[j].Annotations) +} + +func (bci byCompletionIndex) Swap(i, j int) { + bci[i], bci[j] = bci[j], bci[i] +} + +func (bci byCompletionIndex) Len() int { + return len(bci) +} diff --git a/pkg/controller/job/indexed_job_utils_test.go b/pkg/controller/job/indexed_job_utils_test.go new file mode 100644 index 00000000000..47b883027e5 --- /dev/null +++ b/pkg/controller/job/indexed_job_utils_test.go @@ -0,0 +1,256 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + batch "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" +) + +const noIndex = "-" + +func TestCalculateCompletedIndexesStr(t *testing.T) { + cases := map[string][]indexPhase{ + "1": { + {"1", v1.PodSucceeded}, + }, + "5,10": { + {"2", v1.PodFailed}, + {"5", v1.PodSucceeded}, + {"5", v1.PodSucceeded}, + {"10", v1.PodFailed}, + {"10", v1.PodSucceeded}, + }, + "2,3,5-7": { + {"0", v1.PodRunning}, + {"1", v1.PodPending}, + {"2", v1.PodSucceeded}, + {"3", v1.PodSucceeded}, + {"5", v1.PodSucceeded}, + {"6", v1.PodSucceeded}, + {"7", v1.PodSucceeded}, + }, + "0-2": { + {"0", v1.PodSucceeded}, + {"1", v1.PodFailed}, + {"1", v1.PodSucceeded}, + {"2", v1.PodSucceeded}, + {"2", v1.PodSucceeded}, + {"3", v1.PodFailed}, + }, + "0,2-5": { + {"0", v1.PodSucceeded}, + {"1", v1.PodFailed}, + {"2", v1.PodSucceeded}, + {"3", v1.PodSucceeded}, + {"4", v1.PodSucceeded}, + {"5", v1.PodSucceeded}, + {noIndex, v1.PodSucceeded}, + {"-2", v1.PodSucceeded}, + }, + } + for want, tc := range cases { + t.Run(want, func(t *testing.T) { + pods := hollowPodsWithIndexPhase(tc) + gotStr := calculateCompletedIndexesStr(pods) + if diff := cmp.Diff(want, gotStr); diff != "" { + t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff) + } + }) + } +} + +func TestFirstPendingIndexes(t *testing.T) { + cases := map[string]struct { + cnt int + completions int + pods []indexPhase + want []int + }{ + "cnt greater than completions": { + cnt: 5, + completions: 3, + want: []int{0, 1, 2}, + }, + "cnt less than completions": { + cnt: 2, + completions: 5, + want: []int{0, 1}, + }, + "first pods running or succeeded": { + pods: []indexPhase{ + {"0", v1.PodRunning}, + {"1", v1.PodPending}, + {"2", v1.PodFailed}, + }, + cnt: 3, + completions: 10, + want: []int{2, 3, 4}, + }, + "last pods running or succeeded": { + pods: []indexPhase{ + {"4", v1.PodFailed}, + {"5", v1.PodSucceeded}, + {"6", v1.PodPending}, + }, + cnt: 6, + completions: 6, + want: []int{0, 1, 2, 3, 4}, + }, + "mixed": { + pods: []indexPhase{ + {"1", v1.PodFailed}, + {"2", v1.PodSucceeded}, + {"3", v1.PodPending}, + {"5", v1.PodFailed}, + {"5", v1.PodRunning}, + {"8", v1.PodPending}, + {noIndex, v1.PodRunning}, + {"-3", v1.PodRunning}, + }, + cnt: 5, + completions: 10, + want: []int{0, 1, 4, 6, 7}, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + pods := hollowPodsWithIndexPhase(tc.pods) + got := firstPendingIndexes(pods, tc.cnt, tc.completions) + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("Wrong first pending indexes (-want,+got):\n%s", diff) + } + }) + } +} + +func TestAppendDuplicatedIndexPodsForRemoval(t *testing.T) { + cases := map[string]struct { + pods []indexPhase + wantRm []indexPhase + wantLeft []indexPhase + }{ + "all unique": { + pods: []indexPhase{ + {noIndex, v1.PodPending}, + {"2", v1.PodPending}, + {"5", v1.PodRunning}, + }, + wantRm: []indexPhase{ + {noIndex, v1.PodPending}, + }, + wantLeft: []indexPhase{ + {"2", v1.PodPending}, + {"5", v1.PodRunning}, + }, + }, + "all with index": { + pods: []indexPhase{ + {"5", v1.PodPending}, + {"0", v1.PodRunning}, + {"3", v1.PodPending}, + {"0", v1.PodRunning}, + {"3", v1.PodRunning}, + {"0", v1.PodPending}, + }, + wantRm: []indexPhase{ + {"0", v1.PodPending}, + {"0", v1.PodRunning}, + {"3", v1.PodPending}, + }, + wantLeft: []indexPhase{ + {"0", v1.PodRunning}, + {"3", v1.PodRunning}, + {"5", v1.PodPending}, + }, + }, + "mixed": { + pods: []indexPhase{ + {noIndex, v1.PodPending}, + {"invalid", v1.PodRunning}, + {"-2", v1.PodRunning}, + {"0", v1.PodPending}, + {"1", v1.PodPending}, + {"1", v1.PodPending}, + {"1", v1.PodRunning}, + }, + wantRm: []indexPhase{ + {noIndex, v1.PodPending}, + {"invalid", v1.PodRunning}, + {"-2", v1.PodRunning}, + {"1", v1.PodPending}, + {"1", v1.PodPending}, + }, + wantLeft: []indexPhase{ + {"0", v1.PodPending}, + {"1", v1.PodRunning}, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + pods := hollowPodsWithIndexPhase(tc.pods) + rm, left := appendDuplicatedIndexPodsForRemoval(nil, nil, pods) + rmInt := toIndexPhases(rm) + leftInt := toIndexPhases(left) + if diff := cmp.Diff(tc.wantRm, rmInt); diff != "" { + t.Errorf("Unexpected pods for removal (-want,+got):\n%s", diff) + } + if diff := cmp.Diff(tc.wantLeft, leftInt); diff != "" { + t.Errorf("Unexpected pods to keep (-want,+got):\n%s", diff) + } + }) + } +} + +func hollowPodsWithIndexPhase(descs []indexPhase) []*v1.Pod { + pods := make([]*v1.Pod, 0, len(descs)) + for _, desc := range descs { + p := &v1.Pod{ + Status: v1.PodStatus{ + Phase: desc.Phase, + }, + } + if desc.Index != noIndex { + p.Annotations = map[string]string{ + batch.JobCompletionIndexAnnotationAlpha: desc.Index, + } + } + pods = append(pods, p) + } + return pods +} + +type indexPhase struct { + Index string + Phase v1.PodPhase +} + +func toIndexPhases(pods []*v1.Pod) []indexPhase { + result := make([]indexPhase, len(pods)) + for i, p := range pods { + index := noIndex + if p.Annotations != nil { + index = p.Annotations[batch.JobCompletionIndexAnnotationAlpha] + } + result[i] = indexPhase{index, p.Status.Phase} + } + return result +} diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 9e80ec1778a..560fdac815e 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -23,16 +23,17 @@ import ( "reflect" "sort" "sync" + "sync/atomic" "time" batch "k8s.io/api/batch/v1" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" batchinformers "k8s.io/client-go/informers/batch/v1" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" @@ -46,6 +47,7 @@ import ( "k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/features" "k8s.io/utils/integer" ) @@ -453,7 +455,7 @@ func (jm *Controller) syncJob(key string) (bool, error) { } sharedJob, err := jm.jobLister.Jobs(ns).Get(name) if err != nil { - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { klog.V(4).Infof("Job has been deleted: %v", key) jm.expectations.DeleteExpectations(key) return true, nil @@ -467,6 +469,12 @@ func (jm *Controller) syncJob(key string) (bool, error) { return true, nil } + // Cannot create Pods if this is an Indexed Job and the feature is disabled. + if !utilfeature.DefaultFeatureGate.Enabled(features.IndexedJob) && job.Spec.CompletionMode == batch.IndexedCompletion { + jm.recorder.Event(&job, v1.EventTypeWarning, "IndexedJobDisabled", "Skipped Indexed Job sync because feature is disabled.") + return false, nil + } + // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the job sync is just deferred till the next relist. @@ -476,10 +484,9 @@ func (jm *Controller) syncJob(key string) (bool, error) { if err != nil { return false, err } - activePods := controller.FilterActivePods(pods) active := int32(len(activePods)) - succeeded, failed := getStatus(pods) + succeeded, failed := getStatus(&job, pods) conditions := len(job.Status.Conditions) // job first start if job.Status.StartTime == nil { @@ -518,15 +525,9 @@ func (jm *Controller) syncJob(key string) (bool, error) { } if jobFailed { - errCh := make(chan error, active) - jm.deleteJobPods(&job, activePods, errCh) - select { - case manageJobErr = <-errCh: - if manageJobErr != nil { - break - } - default: - } + // TODO(#28486): Account for pod failures in status once we can track + // completions without lingering pods. + _, manageJobErr = jm.deleteJobPods(&job, "", activePods) // update status values accordingly failed += active @@ -535,7 +536,7 @@ func (jm *Controller) syncJob(key string) (bool, error) { jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage) } else { if jobNeedsSync && job.DeletionTimestamp == nil { - active, manageJobErr = jm.manageJob(activePods, succeeded, &job) + active, manageJobErr = jm.manageJob(&job, activePods, succeeded, pods) } completions := succeeded complete := false @@ -586,6 +587,9 @@ func (jm *Controller) syncJob(key string) (bool, error) { job.Status.Active = active job.Status.Succeeded = succeeded job.Status.Failed = failed + if job.Spec.CompletionMode == batch.IndexedCompletion { + job.Status.CompletedIndexes = calculateCompletedIndexesStr(pods) + } if err := jm.updateHandler(&job); err != nil { return forget, err @@ -602,27 +606,32 @@ func (jm *Controller) syncJob(key string) (bool, error) { return forget, manageJobErr } -func (jm *Controller) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) { - // TODO: below code should be replaced with pod termination resulting in - // pod failures, rather than killing pods. Unfortunately none such solution - // exists ATM. There's an open discussion in the topic in - // https://github.com/kubernetes/kubernetes/issues/14602 which might give - // some sort of solution to above problem. - // kill remaining active pods - wait := sync.WaitGroup{} - nbPods := len(pods) - wait.Add(nbPods) - for i := int32(0); i < int32(nbPods); i++ { - go func(ix int32) { - defer wait.Done() - if err := jm.podControl.DeletePod(job.Namespace, pods[ix].Name, job); err != nil && !apierrors.IsNotFound(err) { - defer utilruntime.HandleError(err) - klog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", pods[ix].Name, job.Namespace, job.Name) - errCh <- err +// deleteJobPods deletes the pods, returns the number of successful removals +// and any error. +func (jm *Controller) deleteJobPods(job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) { + errCh := make(chan error, len(pods)) + successfulDeletes := int32(len(pods)) + wg := sync.WaitGroup{} + wg.Add(len(pods)) + for i := range pods { + go func(pod *v1.Pod) { + defer wg.Done() + if err := jm.podControl.DeletePod(job.Namespace, pod.Name, job); err != nil { + // Decrement the expected number of deletes because the informer won't observe this deletion + if jobKey != "" { + jm.expectations.DeletionObserved(jobKey) + } + if !apierrors.IsNotFound(err) { + klog.V(2).Infof("Failed to delete Pod", "job", klog.KObj(job), "pod", klog.KObj(pod), "err", err) + atomic.AddInt32(&successfulDeletes, -1) + errCh <- err + utilruntime.HandleError(err) + } } - }(i) + }(pods[i]) } - wait.Wait() + wg.Wait() + return successfulDeletes, errorFromChannel(errCh) } // pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit @@ -675,17 +684,16 @@ func newCondition(conditionType batch.JobConditionType, reason, message string) } // getStatus returns no of succeeded and failed pods running a job -func getStatus(pods []*v1.Pod) (succeeded, failed int32) { - succeeded = int32(filterPods(pods, v1.PodSucceeded)) - failed = int32(filterPods(pods, v1.PodFailed)) +func getStatus(job *batch.Job, pods []*v1.Pod) (succeeded, failed int32) { + succeeded = int32(countPodsByPhase(job, pods, v1.PodSucceeded)) + failed = int32(countPodsByPhase(job, pods, v1.PodFailed)) return } // manageJob is the core method responsible for managing the number of running // pods according to what is specified in the job.Spec. // Does NOT modify . -func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) { - var activeLock sync.Mutex +func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded int32, allPods []*v1.Pod) (int32, error) { active := int32(len(activePods)) parallelism := *job.Spec.Parallelism jobKey, err := controller.KeyFunc(job) @@ -694,41 +702,22 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc return 0, nil } - var errCh chan error - if active > parallelism { - diff := active - parallelism - errCh = make(chan error, diff) - jm.expectations.ExpectDeletions(jobKey, int(diff)) - klog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff) - // Sort the pods in the order such that not-ready < ready, unscheduled - // < scheduled, and pending < running. This ensures that we delete pods - // in the earlier stages whenever possible. - sort.Sort(controller.ActivePods(activePods)) - - active -= diff - wait := sync.WaitGroup{} - wait.Add(int(diff)) - for i := int32(0); i < diff; i++ { - go func(ix int32) { - defer wait.Done() - if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil { - // Decrement the expected number of deletes because the informer won't observe this deletion - jm.expectations.DeletionObserved(jobKey) - if !apierrors.IsNotFound(err) { - klog.V(2).Infof("Failed to delete %v, decremented expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name) - activeLock.Lock() - active++ - activeLock.Unlock() - errCh <- err - utilruntime.HandleError(err) - } - - } - }(i) + rmAtLeast := active - parallelism + if rmAtLeast < 0 { + rmAtLeast = 0 + } + podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast)) + if len(podsToDelete) > 0 { + jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) + klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", parallelism) + removed, err := jm.deleteJobPods(job, jobKey, podsToDelete) + active -= removed + if err != nil { + return active, err } - wait.Wait() + } - } else if active < parallelism { + if active < parallelism { wantActive := int32(0) if job.Spec.Completions == nil { // Job does not specify a number of completions. Therefore, number active @@ -755,13 +744,25 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc if diff == 0 { return active, nil } + jm.expectations.ExpectCreations(jobKey, int(diff)) - errCh = make(chan error, diff) + errCh := make(chan error, diff) klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff) - active += diff wait := sync.WaitGroup{} + var indexesToAdd []int + if job.Spec.Completions != nil && job.Spec.CompletionMode == batch.IndexedCompletion { + indexesToAdd = firstPendingIndexes(allPods, int(diff), int(*job.Spec.Completions)) + diff = int32(len(indexesToAdd)) + } + active += diff + + podTemplate := job.Spec.Template.DeepCopy() + if job.Spec.CompletionMode == batch.IndexedCompletion { + addCompletionIndexEnvVariables(podTemplate) + } + // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // and double with each successful iteration in a kind of "slow start". // This handles attempts to start large numbers of pods that would @@ -774,11 +775,21 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc errorCount := len(errCh) wait.Add(int(batchSize)) for i := int32(0); i < batchSize; i++ { + completionIndex := unknownCompletionIndex + if indexesToAdd != nil { + completionIndex = indexesToAdd[0] + indexesToAdd = indexesToAdd[1:] + } go func() { + template := podTemplate + if completionIndex != unknownCompletionIndex { + template = podTemplate.DeepCopy() + addCompletionIndexAnnotation(template, completionIndex) + } defer wait.Done() - err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind)) + err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind)) if err != nil { - if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { + if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { // If the namespace is being torn down, we can safely ignore // this error since all subsequent creations will fail. return @@ -789,9 +800,7 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc // Decrement the expected number of creates because the informer won't observe this pod klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name) jm.expectations.CreationObserved(jobKey) - activeLock.Lock() - active-- - activeLock.Unlock() + atomic.AddInt32(&active, -1) errCh <- err } }() @@ -812,20 +821,38 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc } diff -= batchSize } - } - - select { - case err := <-errCh: - // all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time. - if err != nil { + if err := errorFromChannel(errCh); err != nil { return active, err } - default: } return active, nil } +// activePodsForRemoval returns Pods that should be removed because there +// are too many pods running or, if this is an indexed job, there are repeated +// indexes or some pods don't have indexes. +// Sorts candidate pods in the order such that not-ready < ready, unscheduled +// < scheduled, and pending < running. This ensures that we delete pods +// in the earlier stages whenever possible. +func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.Pod { + var rm, left []*v1.Pod + + if job.Spec.CompletionMode == batch.IndexedCompletion { + rm = make([]*v1.Pod, 0, rmAtLeast) + left = make([]*v1.Pod, 0, len(pods)-rmAtLeast) + rm, left = appendDuplicatedIndexPodsForRemoval(rm, left, pods) + } else { + left = pods + } + + if len(rm) < rmAtLeast { + sort.Sort(controller.ActivePods(left)) + rm = append(rm, left[:rmAtLeast-len(rm)]...) + } + return rm +} + func (jm *Controller) updateJobStatus(job *batch.Job) error { jobClient := jm.kubeClient.BatchV1().Jobs(job.Namespace) var err error @@ -864,13 +891,22 @@ func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Dur return calculated } -// filterPods returns pods based on their phase. -func filterPods(pods []*v1.Pod, phase v1.PodPhase) int { +// countPodsByPhase returns pods based on their phase. +func countPodsByPhase(job *batch.Job, pods []*v1.Pod, phase v1.PodPhase) int { result := 0 - for i := range pods { - if phase == pods[i].Status.Phase { + for _, p := range pods { + if phase == p.Status.Phase && (job.Spec.CompletionMode != batch.IndexedCompletion || getCompletionIndex(p.Annotations) != unknownCompletionIndex) { result++ } } return result } + +func errorFromChannel(errCh <-chan error) error { + select { + case err := <-errCh: + return err + default: + } + return nil +} diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 1ad4dd479ca..b948292d384 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -22,15 +22,18 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" batch "k8s.io/api/batch/v1" "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" @@ -38,14 +41,16 @@ import ( core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + featuregatetesting "k8s.io/component-base/featuregate/testing" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/testutil" + "k8s.io/kubernetes/pkg/features" ) var alwaysReady = func() bool { return true } -func newJob(parallelism, completions, backoffLimit int32) *batch.Job { +func newJob(parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job { j := &batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, ObjectMeta: metav1.ObjectMeta{ @@ -57,6 +62,7 @@ func newJob(parallelism, completions, backoffLimit int32) *batch.Job { Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{"foo": "bar"}, }, + CompletionMode: completionMode, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -133,17 +139,31 @@ func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, acti } } +func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status []indexPhase) { + for _, s := range status { + p := newPod(fmt.Sprintf("pod-%s", rand.String(10)), job) + p.Status = v1.PodStatus{Phase: s.Phase} + if s.Index != noIndex { + p.Annotations = map[string]string{ + batch.JobCompletionIndexAnnotationAlpha: s.Index, + } + } + podIndexer.Add(p) + } +} + func TestControllerSyncJob(t *testing.T) { jobConditionComplete := batch.JobComplete jobConditionFailed := batch.JobFailed testCases := map[string]struct { // job setup - parallelism int32 - completions int32 - backoffLimit int32 - deleting bool - podLimit int + parallelism int32 + completions int32 + backoffLimit int32 + deleting bool + podLimit int + completionMode batch.CompletionMode // pod setup podControllerError error @@ -152,15 +172,21 @@ func TestControllerSyncJob(t *testing.T) { activePods int32 succeededPods int32 failedPods int32 + podsWithIndexes []indexPhase // expectations expectedCreations int32 expectedDeletions int32 expectedActive int32 expectedSucceeded int32 + expectedCompletedIdxs string expectedFailed int32 expectedCondition *batch.JobConditionType expectedConditionReason string + expectedCreatedIndexes sets.Int + + // features + indexedJobEnabled bool }{ "job start": { parallelism: 2, @@ -376,25 +402,106 @@ func TestControllerSyncJob(t *testing.T) { expectedCondition: &jobConditionFailed, expectedConditionReason: "BackoffLimitExceeded", }, + "indexed job start": { + parallelism: 2, + completions: 5, + backoffLimit: 6, + completionMode: batch.IndexedCompletion, + jobKeyForget: true, + expectedCreations: 2, + expectedActive: 2, + expectedCreatedIndexes: sets.NewInt(0, 1), + indexedJobEnabled: true, + }, + "indexed job some running and completed pods": { + parallelism: 8, + completions: 20, + backoffLimit: 6, + completionMode: batch.IndexedCompletion, + podsWithIndexes: []indexPhase{ + {"0", v1.PodRunning}, + {"2", v1.PodSucceeded}, + {"3", v1.PodPending}, + {"4", v1.PodSucceeded}, + {"5", v1.PodSucceeded}, + {"7", v1.PodSucceeded}, + {"8", v1.PodSucceeded}, + {"9", v1.PodSucceeded}, + }, + jobKeyForget: true, + expectedCreations: 6, + expectedActive: 8, + expectedSucceeded: 6, + expectedCompletedIdxs: "2,4,5,7-9", + expectedCreatedIndexes: sets.NewInt(1, 6, 10, 11, 12, 13), + indexedJobEnabled: true, + }, + "indexed job some failed pods": { + parallelism: 3, + completions: 4, + backoffLimit: 6, + completionMode: batch.IndexedCompletion, + podsWithIndexes: []indexPhase{ + {"0", v1.PodFailed}, + {"1", v1.PodPending}, + {"2", v1.PodFailed}, + }, + expectedCreations: 2, + expectedActive: 3, + expectedFailed: 2, + expectedCreatedIndexes: sets.NewInt(0, 2), + indexedJobEnabled: true, + }, + "indexed job some pods without index": { + parallelism: 2, + completions: 5, + backoffLimit: 6, + completionMode: batch.IndexedCompletion, + activePods: 1, + succeededPods: 1, + failedPods: 1, + podsWithIndexes: []indexPhase{ + {"invalid", v1.PodRunning}, + {"invalid", v1.PodSucceeded}, + {"0", v1.PodSucceeded}, + {"1", v1.PodRunning}, + {"2", v1.PodRunning}, + }, + jobKeyForget: true, + expectedDeletions: 2, + expectedActive: 2, + expectedSucceeded: 1, + expectedCompletedIdxs: "0", + indexedJobEnabled: true, + }, + "indexed job feature disabled": { + parallelism: 2, + completions: 3, + backoffLimit: 6, + completionMode: batch.IndexedCompletion, + podsWithIndexes: []indexPhase{ + {"0", v1.PodRunning}, + {"1", v1.PodSucceeded}, + }, + // No status updates. + indexedJobEnabled: false, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, tc.indexedJobEnabled)() + // job manager setup - clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady - var actual *batch.Job - manager.updateHandler = func(job *batch.Job) error { - actual = job - return nil - } // job & pods setup - job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) + job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode) if tc.deleting { now := metav1.Now() job.DeletionTimestamp = &now @@ -402,6 +509,13 @@ func TestControllerSyncJob(t *testing.T) { sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods) + setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes) + + actual := job + manager.updateHandler = func(job *batch.Job) error { + actual = job + return nil + } // run forget, err := manager.syncJob(testutil.GetKey(job, t)) @@ -411,7 +525,7 @@ func TestControllerSyncJob(t *testing.T) { if err == nil { t.Error("Syncing jobs expected to return error on podControl exception") } - } else if tc.failedPods > 0 && tc.expectedCondition == nil { + } else if tc.expectedCondition == nil && (hasFailingPods(tc.podsWithIndexes) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) { if err == nil { t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish") } @@ -429,6 +543,9 @@ func TestControllerSyncJob(t *testing.T) { if int32(len(fakePodControl.Templates)) != tc.expectedCreations { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates)) } + if tc.completionMode == batch.IndexedCompletion { + checkCompletionIndexesInPods(t, &fakePodControl, tc.expectedCreatedIndexes) + } if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions { t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName)) } @@ -461,10 +578,13 @@ func TestControllerSyncJob(t *testing.T) { if actual.Status.Succeeded != tc.expectedSucceeded { t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) } + if diff := cmp.Diff(tc.expectedCompletedIdxs, actual.Status.CompletedIndexes); diff != "" { + t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff) + } if actual.Status.Failed != tc.expectedFailed { t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) } - if actual.Status.StartTime == nil { + if actual.Status.StartTime == nil && tc.indexedJobEnabled { t.Error("Missing .status.startTime") } // validate conditions @@ -483,6 +603,23 @@ func TestControllerSyncJob(t *testing.T) { } } +func checkCompletionIndexesInPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Int) { + t.Helper() + gotIndexes := sets.NewInt() + for _, p := range control.Templates { + checkJobCompletionEnvVariable(t, &p.Spec) + ix := getCompletionIndex(p.Annotations) + if ix == -1 { + t.Errorf("Created pod %s didn't have completion index", p.Name) + } else { + gotIndexes.Insert(ix) + } + } + if diff := cmp.Diff(wantIndexes.List(), gotIndexes.List()); diff != "" { + t.Errorf("Unexpected created completion indexes (-want,+got):\n%s", diff) + } +} + func TestSyncJobPastDeadline(t *testing.T) { testCases := map[string]struct { // job setup @@ -568,7 +705,7 @@ func TestSyncJobPastDeadline(t *testing.T) { } // job & pods setup - job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) + job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion) job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds start := metav1.Unix(metav1.Now().Time.Unix()-tc.startTime, 0) job.Status.StartTime = &start @@ -634,7 +771,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) { return nil } - job := newJob(1, 1, 6) + job := newJob(1, 1, 6, batch.NonIndexedCompletion) activeDeadlineSeconds := int64(10) job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds start := metav1.Unix(metav1.Now().Time.Unix()-15, 0) @@ -667,7 +804,7 @@ func TestSyncJobComplete(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady - job := newJob(1, 1, 6) + job := newJob(1, 1, 6, batch.NonIndexedCompletion) job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", "")) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) forget, err := manager.syncJob(testutil.GetKey(job, t)) @@ -695,7 +832,7 @@ func TestSyncJobDeleted(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.updateHandler = func(job *batch.Job) error { return nil } - job := newJob(2, 2, 6) + job := newJob(2, 2, 6, batch.NonIndexedCompletion) forget, err := manager.syncJob(testutil.GetKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) @@ -724,7 +861,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { manager.queue.AddRateLimited(testutil.GetKey(job, t)) return updateError } - job := newJob(2, 2, 6) + job := newJob(2, 2, 6, batch.NonIndexedCompletion) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) forget, err := manager.syncJob(testutil.GetKey(job, t)) if err == nil || err != updateError { @@ -829,9 +966,9 @@ func TestGetPodsForJob(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1, 6) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" - job2 := newJob(1, 1, 6) + job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -870,7 +1007,7 @@ func TestGetPodsForJob(t *testing.T) { } func TestGetPodsForJobAdopt(t *testing.T) { - job1 := newJob(1, 1, 6) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" clientset := fake.NewSimpleClientset(job1) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) @@ -896,7 +1033,7 @@ func TestGetPodsForJobAdopt(t *testing.T) { } func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) { - job1 := newJob(1, 1, 6) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" job1.DeletionTimestamp = &metav1.Time{} clientset := fake.NewSimpleClientset(job1) @@ -926,7 +1063,7 @@ func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) { } func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) { - job1 := newJob(1, 1, 6) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" // The up-to-date object says it's being deleted. job1.DeletionTimestamp = &metav1.Time{} @@ -965,7 +1102,7 @@ func TestGetPodsForJobRelease(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1, 6) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) @@ -994,9 +1131,9 @@ func TestAddPod(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1, 6) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" - job2 := newJob(1, 1, 6) + job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -1039,11 +1176,11 @@ func TestAddPodOrphan(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1, 6) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" - job2 := newJob(1, 1, 6) + job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" - job3 := newJob(1, 1, 6) + job3 := newJob(1, 1, 6, batch.NonIndexedCompletion) job3.Name = "job3" job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) @@ -1067,9 +1204,9 @@ func TestUpdatePod(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1, 6) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" - job2 := newJob(1, 1, 6) + job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -1116,9 +1253,9 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1, 6) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" - job2 := newJob(1, 1, 6) + job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -1143,9 +1280,9 @@ func TestUpdatePodChangeControllerRef(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1, 6) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" - job2 := newJob(1, 1, 6) + job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -1169,9 +1306,9 @@ func TestUpdatePodRelease(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1, 6) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" - job2 := newJob(1, 1, 6) + job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -1195,9 +1332,9 @@ func TestDeletePod(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1, 6) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" - job2 := newJob(1, 1, 6) + job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -1240,11 +1377,11 @@ func TestDeletePodOrphan(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1, 6) + job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" - job2 := newJob(1, 1, 6) + job2 := newJob(1, 1, 6, batch.NonIndexedCompletion) job2.Name = "job2" - job3 := newJob(1, 1, 6) + job3 := newJob(1, 1, 6, batch.NonIndexedCompletion) job3.Name = "job3" job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) @@ -1283,7 +1420,7 @@ func TestSyncJobExpectations(t *testing.T) { manager.jobStoreSynced = alwaysReady manager.updateHandler = func(job *batch.Job) error { return nil } - job := newJob(2, 2, 6) + job := newJob(2, 2, 6, batch.NonIndexedCompletion) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) pods := newPodList(2, v1.PodPending, job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() @@ -1351,7 +1488,7 @@ func TestWatchJobs(t *testing.T) { } func TestWatchPods(t *testing.T) { - testJob := newJob(2, 2, 6) + testJob := newJob(2, 2, 6, batch.NonIndexedCompletion) clientset := fake.NewSimpleClientset(testJob) fakeWatch := watch.NewFake() clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) @@ -1450,7 +1587,7 @@ func TestJobBackoffReset(t *testing.T) { } // job & pods setup - job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) + job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion) key := testutil.GetKey(job, t) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() @@ -1499,7 +1636,7 @@ func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duratio } func TestJobBackoff(t *testing.T) { - job := newJob(1, 1, 1) + job := newJob(1, 1, 1, batch.NonIndexedCompletion) oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) oldPod.Status.Phase = v1.PodRunning oldPod.ResourceVersion = "1" @@ -1630,7 +1767,7 @@ func TestJobBackoffForOnFailure(t *testing.T) { } // job & pods setup - job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) + job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion) job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() @@ -1732,7 +1869,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { } // job & pods setup - job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) + job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion) job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyNever sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() @@ -1769,3 +1906,36 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { }) } } + +func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) { + t.Helper() + want := []v1.EnvVar{ + { + Name: "JOB_COMPLETION_INDEX", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotationAlpha), + }, + }, + }, + } + for _, c := range spec.InitContainers { + if diff := cmp.Diff(want, c.Env); diff != "" { + t.Errorf("Unexpected Env in container %s (-want,+got):\n%s", c.Name, diff) + } + } + for _, c := range spec.Containers { + if diff := cmp.Diff(want, c.Env); diff != "" { + t.Errorf("Unexpected Env in container %s (-want,+got):\n%s", c.Name, diff) + } + } +} + +func hasFailingPods(status []indexPhase) bool { + for _, s := range status { + if s.Phase == v1.PodFailed { + return true + } + } + return false +}