Merge pull request #98812 from alculquicondor/indexed-job

Indexed job implementation
This commit is contained in:
Kubernetes Prow Robot 2021-03-03 15:55:59 -08:00 committed by GitHub
commit afb1ee3c9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 816 additions and 140 deletions

View File

@ -0,0 +1,214 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"fmt"
"math"
"sort"
"strconv"
"strings"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/controller"
)
const (
completionIndexEnvName = "JOB_COMPLETION_INDEX"
unknownCompletionIndex = -1
)
// calculateCompletedIndexesStr returns a string representation of the list
// of completed indexes in compressed format.
func calculateCompletedIndexesStr(pods []*v1.Pod) string {
sort.Sort(byCompletionIndex(pods))
var result strings.Builder
var lastSucceeded int
firstSucceeded := math.MinInt32
for _, p := range pods {
ix := getCompletionIndex(p.Annotations)
if ix == unknownCompletionIndex {
continue
}
if p.Status.Phase == v1.PodSucceeded {
if firstSucceeded == math.MinInt32 {
firstSucceeded = ix
} else if ix > lastSucceeded+1 {
addSingleOrRangeStr(&result, firstSucceeded, lastSucceeded)
firstSucceeded = ix
}
lastSucceeded = ix
}
}
if firstSucceeded != math.MinInt32 {
addSingleOrRangeStr(&result, firstSucceeded, lastSucceeded)
}
return result.String()
}
func addSingleOrRangeStr(builder *strings.Builder, first, last int) {
if builder.Len() > 0 {
builder.WriteRune(',')
}
builder.WriteString(strconv.Itoa(first))
if last > first {
if last == first+1 {
builder.WriteRune(',')
} else {
builder.WriteRune('-')
}
builder.WriteString(strconv.Itoa(last))
}
}
// firstPendingIndexes returns `count` indexes less than `completions` that are
// not covered by running or succeeded pods.
func firstPendingIndexes(pods []*v1.Pod, count, completions int) []int {
if count == 0 {
return nil
}
nonPending := sets.NewInt()
for _, p := range pods {
if p.Status.Phase == v1.PodSucceeded || controller.IsPodActive(p) {
ix := getCompletionIndex(p.Annotations)
if ix != unknownCompletionIndex {
nonPending.Insert(ix)
}
}
}
result := make([]int, 0, count)
// The following algorithm is bounded by the number of non pending pods and
// parallelism.
// TODO(#99368): Convert the list of non-pending pods into a set of
// non-pending intervals from the job's .status.completedIndexes and active
// pods.
candidate := 0
for _, np := range nonPending.List() {
for ; candidate < np && candidate < completions; candidate++ {
result = append(result, candidate)
if len(result) == count {
return result
}
}
candidate = np + 1
}
for ; candidate < completions && len(result) < count; candidate++ {
result = append(result, candidate)
}
return result
}
// appendDuplicatedIndexPodsForRemoval scans active `pods` for duplicated
// completion indexes. For each index, it selects n-1 pods for removal, where n
// is the number of repetitions. The pods to be removed are appended to `rm`,
// while the remaining pods are appended to `left`.
// All pods that don't have a completion index are appended to `rm`.
func appendDuplicatedIndexPodsForRemoval(rm, left, pods []*v1.Pod) ([]*v1.Pod, []*v1.Pod) {
sort.Sort(byCompletionIndex(pods))
lastIndex := unknownCompletionIndex
firstRepeatPos := 0
for i, p := range pods {
ix := getCompletionIndex(p.Annotations)
if ix != lastIndex {
rm, left = appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:i], lastIndex)
firstRepeatPos = i
lastIndex = ix
}
}
return appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:], lastIndex)
}
func appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods []*v1.Pod, ix int) ([]*v1.Pod, []*v1.Pod) {
if ix == unknownCompletionIndex {
rm = append(rm, pods...)
return rm, left
}
if len(pods) == 1 {
left = append(left, pods[0])
return rm, left
}
sort.Sort(controller.ActivePods(pods))
rm = append(rm, pods[:len(pods)-1]...)
left = append(left, pods[len(pods)-1])
return rm, left
}
func getCompletionIndex(annotations map[string]string) int {
if annotations == nil {
return unknownCompletionIndex
}
v, ok := annotations[batch.JobCompletionIndexAnnotationAlpha]
if !ok {
return unknownCompletionIndex
}
i, err := strconv.Atoi(v)
if err != nil {
return unknownCompletionIndex
}
if i < 0 {
return unknownCompletionIndex
}
return i
}
func addCompletionIndexEnvVariables(template *v1.PodTemplateSpec) {
for i := range template.Spec.InitContainers {
addCompletionIndexEnvVariable(&template.Spec.InitContainers[i])
}
for i := range template.Spec.Containers {
addCompletionIndexEnvVariable(&template.Spec.Containers[i])
}
}
func addCompletionIndexEnvVariable(container *v1.Container) {
for _, v := range container.Env {
if v.Name == completionIndexEnvName {
return
}
}
container.Env = append(container.Env, v1.EnvVar{
Name: completionIndexEnvName,
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotationAlpha),
},
},
})
}
func addCompletionIndexAnnotation(template *v1.PodTemplateSpec, index int) {
if template.Annotations == nil {
template.Annotations = make(map[string]string, 1)
}
template.Annotations[batch.JobCompletionIndexAnnotationAlpha] = strconv.Itoa(index)
}
type byCompletionIndex []*v1.Pod
func (bci byCompletionIndex) Less(i, j int) bool {
return getCompletionIndex(bci[i].Annotations) < getCompletionIndex(bci[j].Annotations)
}
func (bci byCompletionIndex) Swap(i, j int) {
bci[i], bci[j] = bci[j], bci[i]
}
func (bci byCompletionIndex) Len() int {
return len(bci)
}

View File

@ -0,0 +1,256 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"testing"
"github.com/google/go-cmp/cmp"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
)
const noIndex = "-"
func TestCalculateCompletedIndexesStr(t *testing.T) {
cases := map[string][]indexPhase{
"1": {
{"1", v1.PodSucceeded},
},
"5,10": {
{"2", v1.PodFailed},
{"5", v1.PodSucceeded},
{"5", v1.PodSucceeded},
{"10", v1.PodFailed},
{"10", v1.PodSucceeded},
},
"2,3,5-7": {
{"0", v1.PodRunning},
{"1", v1.PodPending},
{"2", v1.PodSucceeded},
{"3", v1.PodSucceeded},
{"5", v1.PodSucceeded},
{"6", v1.PodSucceeded},
{"7", v1.PodSucceeded},
},
"0-2": {
{"0", v1.PodSucceeded},
{"1", v1.PodFailed},
{"1", v1.PodSucceeded},
{"2", v1.PodSucceeded},
{"2", v1.PodSucceeded},
{"3", v1.PodFailed},
},
"0,2-5": {
{"0", v1.PodSucceeded},
{"1", v1.PodFailed},
{"2", v1.PodSucceeded},
{"3", v1.PodSucceeded},
{"4", v1.PodSucceeded},
{"5", v1.PodSucceeded},
{noIndex, v1.PodSucceeded},
{"-2", v1.PodSucceeded},
},
}
for want, tc := range cases {
t.Run(want, func(t *testing.T) {
pods := hollowPodsWithIndexPhase(tc)
gotStr := calculateCompletedIndexesStr(pods)
if diff := cmp.Diff(want, gotStr); diff != "" {
t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff)
}
})
}
}
func TestFirstPendingIndexes(t *testing.T) {
cases := map[string]struct {
cnt int
completions int
pods []indexPhase
want []int
}{
"cnt greater than completions": {
cnt: 5,
completions: 3,
want: []int{0, 1, 2},
},
"cnt less than completions": {
cnt: 2,
completions: 5,
want: []int{0, 1},
},
"first pods running or succeeded": {
pods: []indexPhase{
{"0", v1.PodRunning},
{"1", v1.PodPending},
{"2", v1.PodFailed},
},
cnt: 3,
completions: 10,
want: []int{2, 3, 4},
},
"last pods running or succeeded": {
pods: []indexPhase{
{"4", v1.PodFailed},
{"5", v1.PodSucceeded},
{"6", v1.PodPending},
},
cnt: 6,
completions: 6,
want: []int{0, 1, 2, 3, 4},
},
"mixed": {
pods: []indexPhase{
{"1", v1.PodFailed},
{"2", v1.PodSucceeded},
{"3", v1.PodPending},
{"5", v1.PodFailed},
{"5", v1.PodRunning},
{"8", v1.PodPending},
{noIndex, v1.PodRunning},
{"-3", v1.PodRunning},
},
cnt: 5,
completions: 10,
want: []int{0, 1, 4, 6, 7},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
pods := hollowPodsWithIndexPhase(tc.pods)
got := firstPendingIndexes(pods, tc.cnt, tc.completions)
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("Wrong first pending indexes (-want,+got):\n%s", diff)
}
})
}
}
func TestAppendDuplicatedIndexPodsForRemoval(t *testing.T) {
cases := map[string]struct {
pods []indexPhase
wantRm []indexPhase
wantLeft []indexPhase
}{
"all unique": {
pods: []indexPhase{
{noIndex, v1.PodPending},
{"2", v1.PodPending},
{"5", v1.PodRunning},
},
wantRm: []indexPhase{
{noIndex, v1.PodPending},
},
wantLeft: []indexPhase{
{"2", v1.PodPending},
{"5", v1.PodRunning},
},
},
"all with index": {
pods: []indexPhase{
{"5", v1.PodPending},
{"0", v1.PodRunning},
{"3", v1.PodPending},
{"0", v1.PodRunning},
{"3", v1.PodRunning},
{"0", v1.PodPending},
},
wantRm: []indexPhase{
{"0", v1.PodPending},
{"0", v1.PodRunning},
{"3", v1.PodPending},
},
wantLeft: []indexPhase{
{"0", v1.PodRunning},
{"3", v1.PodRunning},
{"5", v1.PodPending},
},
},
"mixed": {
pods: []indexPhase{
{noIndex, v1.PodPending},
{"invalid", v1.PodRunning},
{"-2", v1.PodRunning},
{"0", v1.PodPending},
{"1", v1.PodPending},
{"1", v1.PodPending},
{"1", v1.PodRunning},
},
wantRm: []indexPhase{
{noIndex, v1.PodPending},
{"invalid", v1.PodRunning},
{"-2", v1.PodRunning},
{"1", v1.PodPending},
{"1", v1.PodPending},
},
wantLeft: []indexPhase{
{"0", v1.PodPending},
{"1", v1.PodRunning},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
pods := hollowPodsWithIndexPhase(tc.pods)
rm, left := appendDuplicatedIndexPodsForRemoval(nil, nil, pods)
rmInt := toIndexPhases(rm)
leftInt := toIndexPhases(left)
if diff := cmp.Diff(tc.wantRm, rmInt); diff != "" {
t.Errorf("Unexpected pods for removal (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(tc.wantLeft, leftInt); diff != "" {
t.Errorf("Unexpected pods to keep (-want,+got):\n%s", diff)
}
})
}
}
func hollowPodsWithIndexPhase(descs []indexPhase) []*v1.Pod {
pods := make([]*v1.Pod, 0, len(descs))
for _, desc := range descs {
p := &v1.Pod{
Status: v1.PodStatus{
Phase: desc.Phase,
},
}
if desc.Index != noIndex {
p.Annotations = map[string]string{
batch.JobCompletionIndexAnnotationAlpha: desc.Index,
}
}
pods = append(pods, p)
}
return pods
}
type indexPhase struct {
Index string
Phase v1.PodPhase
}
func toIndexPhases(pods []*v1.Pod) []indexPhase {
result := make([]indexPhase, len(pods))
for i, p := range pods {
index := noIndex
if p.Annotations != nil {
index = p.Annotations[batch.JobCompletionIndexAnnotationAlpha]
}
result[i] = indexPhase{index, p.Status.Phase}
}
return result
}

View File

@ -23,16 +23,17 @@ import (
"reflect" "reflect"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"time" "time"
batch "k8s.io/api/batch/v1" batch "k8s.io/api/batch/v1"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
batchinformers "k8s.io/client-go/informers/batch/v1" batchinformers "k8s.io/client-go/informers/batch/v1"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@ -46,6 +47,7 @@ import (
"k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/integer" "k8s.io/utils/integer"
) )
@ -453,7 +455,7 @@ func (jm *Controller) syncJob(key string) (bool, error) {
} }
sharedJob, err := jm.jobLister.Jobs(ns).Get(name) sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
if err != nil { if err != nil {
if errors.IsNotFound(err) { if apierrors.IsNotFound(err) {
klog.V(4).Infof("Job has been deleted: %v", key) klog.V(4).Infof("Job has been deleted: %v", key)
jm.expectations.DeleteExpectations(key) jm.expectations.DeleteExpectations(key)
return true, nil return true, nil
@ -467,6 +469,12 @@ func (jm *Controller) syncJob(key string) (bool, error) {
return true, nil return true, nil
} }
// Cannot create Pods if this is an Indexed Job and the feature is disabled.
if !utilfeature.DefaultFeatureGate.Enabled(features.IndexedJob) && job.Spec.CompletionMode == batch.IndexedCompletion {
jm.recorder.Event(&job, v1.EventTypeWarning, "IndexedJobDisabled", "Skipped Indexed Job sync because feature is disabled.")
return false, nil
}
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters // and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist. // the store after we've checked the expectation, the job sync is just deferred till the next relist.
@ -476,10 +484,9 @@ func (jm *Controller) syncJob(key string) (bool, error) {
if err != nil { if err != nil {
return false, err return false, err
} }
activePods := controller.FilterActivePods(pods) activePods := controller.FilterActivePods(pods)
active := int32(len(activePods)) active := int32(len(activePods))
succeeded, failed := getStatus(pods) succeeded, failed := getStatus(&job, pods)
conditions := len(job.Status.Conditions) conditions := len(job.Status.Conditions)
// job first start // job first start
if job.Status.StartTime == nil { if job.Status.StartTime == nil {
@ -518,15 +525,9 @@ func (jm *Controller) syncJob(key string) (bool, error) {
} }
if jobFailed { if jobFailed {
errCh := make(chan error, active) // TODO(#28486): Account for pod failures in status once we can track
jm.deleteJobPods(&job, activePods, errCh) // completions without lingering pods.
select { _, manageJobErr = jm.deleteJobPods(&job, "", activePods)
case manageJobErr = <-errCh:
if manageJobErr != nil {
break
}
default:
}
// update status values accordingly // update status values accordingly
failed += active failed += active
@ -535,7 +536,7 @@ func (jm *Controller) syncJob(key string) (bool, error) {
jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage) jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
} else { } else {
if jobNeedsSync && job.DeletionTimestamp == nil { if jobNeedsSync && job.DeletionTimestamp == nil {
active, manageJobErr = jm.manageJob(activePods, succeeded, &job) active, manageJobErr = jm.manageJob(&job, activePods, succeeded, pods)
} }
completions := succeeded completions := succeeded
complete := false complete := false
@ -586,6 +587,9 @@ func (jm *Controller) syncJob(key string) (bool, error) {
job.Status.Active = active job.Status.Active = active
job.Status.Succeeded = succeeded job.Status.Succeeded = succeeded
job.Status.Failed = failed job.Status.Failed = failed
if job.Spec.CompletionMode == batch.IndexedCompletion {
job.Status.CompletedIndexes = calculateCompletedIndexesStr(pods)
}
if err := jm.updateHandler(&job); err != nil { if err := jm.updateHandler(&job); err != nil {
return forget, err return forget, err
@ -602,27 +606,32 @@ func (jm *Controller) syncJob(key string) (bool, error) {
return forget, manageJobErr return forget, manageJobErr
} }
func (jm *Controller) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) { // deleteJobPods deletes the pods, returns the number of successful removals
// TODO: below code should be replaced with pod termination resulting in // and any error.
// pod failures, rather than killing pods. Unfortunately none such solution func (jm *Controller) deleteJobPods(job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) {
// exists ATM. There's an open discussion in the topic in errCh := make(chan error, len(pods))
// https://github.com/kubernetes/kubernetes/issues/14602 which might give successfulDeletes := int32(len(pods))
// some sort of solution to above problem. wg := sync.WaitGroup{}
// kill remaining active pods wg.Add(len(pods))
wait := sync.WaitGroup{} for i := range pods {
nbPods := len(pods) go func(pod *v1.Pod) {
wait.Add(nbPods) defer wg.Done()
for i := int32(0); i < int32(nbPods); i++ { if err := jm.podControl.DeletePod(job.Namespace, pod.Name, job); err != nil {
go func(ix int32) { // Decrement the expected number of deletes because the informer won't observe this deletion
defer wait.Done() if jobKey != "" {
if err := jm.podControl.DeletePod(job.Namespace, pods[ix].Name, job); err != nil && !apierrors.IsNotFound(err) { jm.expectations.DeletionObserved(jobKey)
defer utilruntime.HandleError(err) }
klog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", pods[ix].Name, job.Namespace, job.Name) if !apierrors.IsNotFound(err) {
errCh <- err klog.V(2).Infof("Failed to delete Pod", "job", klog.KObj(job), "pod", klog.KObj(pod), "err", err)
atomic.AddInt32(&successfulDeletes, -1)
errCh <- err
utilruntime.HandleError(err)
}
} }
}(i) }(pods[i])
} }
wait.Wait() wg.Wait()
return successfulDeletes, errorFromChannel(errCh)
} }
// pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit // pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit
@ -675,17 +684,16 @@ func newCondition(conditionType batch.JobConditionType, reason, message string)
} }
// getStatus returns no of succeeded and failed pods running a job // getStatus returns no of succeeded and failed pods running a job
func getStatus(pods []*v1.Pod) (succeeded, failed int32) { func getStatus(job *batch.Job, pods []*v1.Pod) (succeeded, failed int32) {
succeeded = int32(filterPods(pods, v1.PodSucceeded)) succeeded = int32(countPodsByPhase(job, pods, v1.PodSucceeded))
failed = int32(filterPods(pods, v1.PodFailed)) failed = int32(countPodsByPhase(job, pods, v1.PodFailed))
return return
} }
// manageJob is the core method responsible for managing the number of running // manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec. // pods according to what is specified in the job.Spec.
// Does NOT modify <activePods>. // Does NOT modify <activePods>.
func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) { func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded int32, allPods []*v1.Pod) (int32, error) {
var activeLock sync.Mutex
active := int32(len(activePods)) active := int32(len(activePods))
parallelism := *job.Spec.Parallelism parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job) jobKey, err := controller.KeyFunc(job)
@ -694,41 +702,22 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc
return 0, nil return 0, nil
} }
var errCh chan error rmAtLeast := active - parallelism
if active > parallelism { if rmAtLeast < 0 {
diff := active - parallelism rmAtLeast = 0
errCh = make(chan error, diff) }
jm.expectations.ExpectDeletions(jobKey, int(diff)) podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast))
klog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff) if len(podsToDelete) > 0 {
// Sort the pods in the order such that not-ready < ready, unscheduled jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
// < scheduled, and pending < running. This ensures that we delete pods klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", parallelism)
// in the earlier stages whenever possible. removed, err := jm.deleteJobPods(job, jobKey, podsToDelete)
sort.Sort(controller.ActivePods(activePods)) active -= removed
if err != nil {
active -= diff return active, err
wait := sync.WaitGroup{}
wait.Add(int(diff))
for i := int32(0); i < diff; i++ {
go func(ix int32) {
defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
jm.expectations.DeletionObserved(jobKey)
if !apierrors.IsNotFound(err) {
klog.V(2).Infof("Failed to delete %v, decremented expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name)
activeLock.Lock()
active++
activeLock.Unlock()
errCh <- err
utilruntime.HandleError(err)
}
}
}(i)
} }
wait.Wait() }
} else if active < parallelism { if active < parallelism {
wantActive := int32(0) wantActive := int32(0)
if job.Spec.Completions == nil { if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active // Job does not specify a number of completions. Therefore, number active
@ -755,13 +744,25 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc
if diff == 0 { if diff == 0 {
return active, nil return active, nil
} }
jm.expectations.ExpectCreations(jobKey, int(diff)) jm.expectations.ExpectCreations(jobKey, int(diff))
errCh = make(chan error, diff) errCh := make(chan error, diff)
klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff) klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)
active += diff
wait := sync.WaitGroup{} wait := sync.WaitGroup{}
var indexesToAdd []int
if job.Spec.Completions != nil && job.Spec.CompletionMode == batch.IndexedCompletion {
indexesToAdd = firstPendingIndexes(allPods, int(diff), int(*job.Spec.Completions))
diff = int32(len(indexesToAdd))
}
active += diff
podTemplate := job.Spec.Template.DeepCopy()
if job.Spec.CompletionMode == batch.IndexedCompletion {
addCompletionIndexEnvVariables(podTemplate)
}
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start". // and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would // This handles attempts to start large numbers of pods that would
@ -774,11 +775,21 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc
errorCount := len(errCh) errorCount := len(errCh)
wait.Add(int(batchSize)) wait.Add(int(batchSize))
for i := int32(0); i < batchSize; i++ { for i := int32(0); i < batchSize; i++ {
completionIndex := unknownCompletionIndex
if indexesToAdd != nil {
completionIndex = indexesToAdd[0]
indexesToAdd = indexesToAdd[1:]
}
go func() { go func() {
template := podTemplate
if completionIndex != unknownCompletionIndex {
template = podTemplate.DeepCopy()
addCompletionIndexAnnotation(template, completionIndex)
}
defer wait.Done() defer wait.Done()
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind)) err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind))
if err != nil { if err != nil {
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore // If the namespace is being torn down, we can safely ignore
// this error since all subsequent creations will fail. // this error since all subsequent creations will fail.
return return
@ -789,9 +800,7 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc
// Decrement the expected number of creates because the informer won't observe this pod // Decrement the expected number of creates because the informer won't observe this pod
klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name) klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
jm.expectations.CreationObserved(jobKey) jm.expectations.CreationObserved(jobKey)
activeLock.Lock() atomic.AddInt32(&active, -1)
active--
activeLock.Unlock()
errCh <- err errCh <- err
} }
}() }()
@ -812,20 +821,38 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc
} }
diff -= batchSize diff -= batchSize
} }
} if err := errorFromChannel(errCh); err != nil {
select {
case err := <-errCh:
// all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time.
if err != nil {
return active, err return active, err
} }
default:
} }
return active, nil return active, nil
} }
// activePodsForRemoval returns Pods that should be removed because there
// are too many pods running or, if this is an indexed job, there are repeated
// indexes or some pods don't have indexes.
// Sorts candidate pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.Pod {
var rm, left []*v1.Pod
if job.Spec.CompletionMode == batch.IndexedCompletion {
rm = make([]*v1.Pod, 0, rmAtLeast)
left = make([]*v1.Pod, 0, len(pods)-rmAtLeast)
rm, left = appendDuplicatedIndexPodsForRemoval(rm, left, pods)
} else {
left = pods
}
if len(rm) < rmAtLeast {
sort.Sort(controller.ActivePods(left))
rm = append(rm, left[:rmAtLeast-len(rm)]...)
}
return rm
}
func (jm *Controller) updateJobStatus(job *batch.Job) error { func (jm *Controller) updateJobStatus(job *batch.Job) error {
jobClient := jm.kubeClient.BatchV1().Jobs(job.Namespace) jobClient := jm.kubeClient.BatchV1().Jobs(job.Namespace)
var err error var err error
@ -864,13 +891,22 @@ func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Dur
return calculated return calculated
} }
// filterPods returns pods based on their phase. // countPodsByPhase returns pods based on their phase.
func filterPods(pods []*v1.Pod, phase v1.PodPhase) int { func countPodsByPhase(job *batch.Job, pods []*v1.Pod, phase v1.PodPhase) int {
result := 0 result := 0
for i := range pods { for _, p := range pods {
if phase == pods[i].Status.Phase { if phase == p.Status.Phase && (job.Spec.CompletionMode != batch.IndexedCompletion || getCompletionIndex(p.Annotations) != unknownCompletionIndex) {
result++ result++
} }
} }
return result return result
} }
func errorFromChannel(errCh <-chan error) error {
select {
case err := <-errCh:
return err
default:
}
return nil
}

View File

@ -22,15 +22,18 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
batch "k8s.io/api/batch/v1" batch "k8s.io/api/batch/v1"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
@ -38,14 +41,16 @@ import (
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
featuregatetesting "k8s.io/component-base/featuregate/testing"
_ "k8s.io/kubernetes/pkg/apis/core/install" _ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/features"
) )
var alwaysReady = func() bool { return true } var alwaysReady = func() bool { return true }
func newJob(parallelism, completions, backoffLimit int32) *batch.Job { func newJob(parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job {
j := &batch.Job{ j := &batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"}, TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -57,6 +62,7 @@ func newJob(parallelism, completions, backoffLimit int32) *batch.Job {
Selector: &metav1.LabelSelector{ Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"}, MatchLabels: map[string]string{"foo": "bar"},
}, },
CompletionMode: completionMode,
Template: v1.PodTemplateSpec{ Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{ Labels: map[string]string{
@ -133,17 +139,31 @@ func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, acti
} }
} }
func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status []indexPhase) {
for _, s := range status {
p := newPod(fmt.Sprintf("pod-%s", rand.String(10)), job)
p.Status = v1.PodStatus{Phase: s.Phase}
if s.Index != noIndex {
p.Annotations = map[string]string{
batch.JobCompletionIndexAnnotationAlpha: s.Index,
}
}
podIndexer.Add(p)
}
}
func TestControllerSyncJob(t *testing.T) { func TestControllerSyncJob(t *testing.T) {
jobConditionComplete := batch.JobComplete jobConditionComplete := batch.JobComplete
jobConditionFailed := batch.JobFailed jobConditionFailed := batch.JobFailed
testCases := map[string]struct { testCases := map[string]struct {
// job setup // job setup
parallelism int32 parallelism int32
completions int32 completions int32
backoffLimit int32 backoffLimit int32
deleting bool deleting bool
podLimit int podLimit int
completionMode batch.CompletionMode
// pod setup // pod setup
podControllerError error podControllerError error
@ -152,15 +172,21 @@ func TestControllerSyncJob(t *testing.T) {
activePods int32 activePods int32
succeededPods int32 succeededPods int32
failedPods int32 failedPods int32
podsWithIndexes []indexPhase
// expectations // expectations
expectedCreations int32 expectedCreations int32
expectedDeletions int32 expectedDeletions int32
expectedActive int32 expectedActive int32
expectedSucceeded int32 expectedSucceeded int32
expectedCompletedIdxs string
expectedFailed int32 expectedFailed int32
expectedCondition *batch.JobConditionType expectedCondition *batch.JobConditionType
expectedConditionReason string expectedConditionReason string
expectedCreatedIndexes sets.Int
// features
indexedJobEnabled bool
}{ }{
"job start": { "job start": {
parallelism: 2, parallelism: 2,
@ -376,25 +402,106 @@ func TestControllerSyncJob(t *testing.T) {
expectedCondition: &jobConditionFailed, expectedCondition: &jobConditionFailed,
expectedConditionReason: "BackoffLimitExceeded", expectedConditionReason: "BackoffLimitExceeded",
}, },
"indexed job start": {
parallelism: 2,
completions: 5,
backoffLimit: 6,
completionMode: batch.IndexedCompletion,
jobKeyForget: true,
expectedCreations: 2,
expectedActive: 2,
expectedCreatedIndexes: sets.NewInt(0, 1),
indexedJobEnabled: true,
},
"indexed job some running and completed pods": {
parallelism: 8,
completions: 20,
backoffLimit: 6,
completionMode: batch.IndexedCompletion,
podsWithIndexes: []indexPhase{
{"0", v1.PodRunning},
{"2", v1.PodSucceeded},
{"3", v1.PodPending},
{"4", v1.PodSucceeded},
{"5", v1.PodSucceeded},
{"7", v1.PodSucceeded},
{"8", v1.PodSucceeded},
{"9", v1.PodSucceeded},
},
jobKeyForget: true,
expectedCreations: 6,
expectedActive: 8,
expectedSucceeded: 6,
expectedCompletedIdxs: "2,4,5,7-9",
expectedCreatedIndexes: sets.NewInt(1, 6, 10, 11, 12, 13),
indexedJobEnabled: true,
},
"indexed job some failed pods": {
parallelism: 3,
completions: 4,
backoffLimit: 6,
completionMode: batch.IndexedCompletion,
podsWithIndexes: []indexPhase{
{"0", v1.PodFailed},
{"1", v1.PodPending},
{"2", v1.PodFailed},
},
expectedCreations: 2,
expectedActive: 3,
expectedFailed: 2,
expectedCreatedIndexes: sets.NewInt(0, 2),
indexedJobEnabled: true,
},
"indexed job some pods without index": {
parallelism: 2,
completions: 5,
backoffLimit: 6,
completionMode: batch.IndexedCompletion,
activePods: 1,
succeededPods: 1,
failedPods: 1,
podsWithIndexes: []indexPhase{
{"invalid", v1.PodRunning},
{"invalid", v1.PodSucceeded},
{"0", v1.PodSucceeded},
{"1", v1.PodRunning},
{"2", v1.PodRunning},
},
jobKeyForget: true,
expectedDeletions: 2,
expectedActive: 2,
expectedSucceeded: 1,
expectedCompletedIdxs: "0",
indexedJobEnabled: true,
},
"indexed job feature disabled": {
parallelism: 2,
completions: 3,
backoffLimit: 6,
completionMode: batch.IndexedCompletion,
podsWithIndexes: []indexPhase{
{"0", v1.PodRunning},
{"1", v1.PodSucceeded},
},
// No status updates.
indexedJobEnabled: false,
},
} }
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, tc.indexedJobEnabled)()
// job manager setup // job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit} fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
var actual *batch.Job
manager.updateHandler = func(job *batch.Job) error {
actual = job
return nil
}
// job & pods setup // job & pods setup
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode)
if tc.deleting { if tc.deleting {
now := metav1.Now() now := metav1.Now()
job.DeletionTimestamp = &now job.DeletionTimestamp = &now
@ -402,6 +509,13 @@ func TestControllerSyncJob(t *testing.T) {
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods) setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods)
setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes)
actual := job
manager.updateHandler = func(job *batch.Job) error {
actual = job
return nil
}
// run // run
forget, err := manager.syncJob(testutil.GetKey(job, t)) forget, err := manager.syncJob(testutil.GetKey(job, t))
@ -411,7 +525,7 @@ func TestControllerSyncJob(t *testing.T) {
if err == nil { if err == nil {
t.Error("Syncing jobs expected to return error on podControl exception") t.Error("Syncing jobs expected to return error on podControl exception")
} }
} else if tc.failedPods > 0 && tc.expectedCondition == nil { } else if tc.expectedCondition == nil && (hasFailingPods(tc.podsWithIndexes) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) {
if err == nil { if err == nil {
t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish") t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish")
} }
@ -429,6 +543,9 @@ func TestControllerSyncJob(t *testing.T) {
if int32(len(fakePodControl.Templates)) != tc.expectedCreations { if int32(len(fakePodControl.Templates)) != tc.expectedCreations {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates)) t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates))
} }
if tc.completionMode == batch.IndexedCompletion {
checkCompletionIndexesInPods(t, &fakePodControl, tc.expectedCreatedIndexes)
}
if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions { if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName)) t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName))
} }
@ -461,10 +578,13 @@ func TestControllerSyncJob(t *testing.T) {
if actual.Status.Succeeded != tc.expectedSucceeded { if actual.Status.Succeeded != tc.expectedSucceeded {
t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded)
} }
if diff := cmp.Diff(tc.expectedCompletedIdxs, actual.Status.CompletedIndexes); diff != "" {
t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff)
}
if actual.Status.Failed != tc.expectedFailed { if actual.Status.Failed != tc.expectedFailed {
t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed)
} }
if actual.Status.StartTime == nil { if actual.Status.StartTime == nil && tc.indexedJobEnabled {
t.Error("Missing .status.startTime") t.Error("Missing .status.startTime")
} }
// validate conditions // validate conditions
@ -483,6 +603,23 @@ func TestControllerSyncJob(t *testing.T) {
} }
} }
func checkCompletionIndexesInPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Int) {
t.Helper()
gotIndexes := sets.NewInt()
for _, p := range control.Templates {
checkJobCompletionEnvVariable(t, &p.Spec)
ix := getCompletionIndex(p.Annotations)
if ix == -1 {
t.Errorf("Created pod %s didn't have completion index", p.Name)
} else {
gotIndexes.Insert(ix)
}
}
if diff := cmp.Diff(wantIndexes.List(), gotIndexes.List()); diff != "" {
t.Errorf("Unexpected created completion indexes (-want,+got):\n%s", diff)
}
}
func TestSyncJobPastDeadline(t *testing.T) { func TestSyncJobPastDeadline(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
// job setup // job setup
@ -568,7 +705,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
} }
// job & pods setup // job & pods setup
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds
start := metav1.Unix(metav1.Now().Time.Unix()-tc.startTime, 0) start := metav1.Unix(metav1.Now().Time.Unix()-tc.startTime, 0)
job.Status.StartTime = &start job.Status.StartTime = &start
@ -634,7 +771,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
return nil return nil
} }
job := newJob(1, 1, 6) job := newJob(1, 1, 6, batch.NonIndexedCompletion)
activeDeadlineSeconds := int64(10) activeDeadlineSeconds := int64(10)
job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
start := metav1.Unix(metav1.Now().Time.Unix()-15, 0) start := metav1.Unix(metav1.Now().Time.Unix()-15, 0)
@ -667,7 +804,7 @@ func TestSyncJobComplete(t *testing.T) {
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
job := newJob(1, 1, 6) job := newJob(1, 1, 6, batch.NonIndexedCompletion)
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", "")) job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
forget, err := manager.syncJob(testutil.GetKey(job, t)) forget, err := manager.syncJob(testutil.GetKey(job, t))
@ -695,7 +832,7 @@ func TestSyncJobDeleted(t *testing.T) {
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
manager.updateHandler = func(job *batch.Job) error { return nil } manager.updateHandler = func(job *batch.Job) error { return nil }
job := newJob(2, 2, 6) job := newJob(2, 2, 6, batch.NonIndexedCompletion)
forget, err := manager.syncJob(testutil.GetKey(job, t)) forget, err := manager.syncJob(testutil.GetKey(job, t))
if err != nil { if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err) t.Errorf("Unexpected error when syncing jobs %v", err)
@ -724,7 +861,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
manager.queue.AddRateLimited(testutil.GetKey(job, t)) manager.queue.AddRateLimited(testutil.GetKey(job, t))
return updateError return updateError
} }
job := newJob(2, 2, 6) job := newJob(2, 2, 6, batch.NonIndexedCompletion)
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
forget, err := manager.syncJob(testutil.GetKey(job, t)) forget, err := manager.syncJob(testutil.GetKey(job, t))
if err == nil || err != updateError { if err == nil || err != updateError {
@ -829,9 +966,9 @@ func TestGetPodsForJob(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
job2 := newJob(1, 1, 6) job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job2.Name = "job2" job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -870,7 +1007,7 @@ func TestGetPodsForJob(t *testing.T) {
} }
func TestGetPodsForJobAdopt(t *testing.T) { func TestGetPodsForJobAdopt(t *testing.T) {
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
clientset := fake.NewSimpleClientset(job1) clientset := fake.NewSimpleClientset(job1)
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
@ -896,7 +1033,7 @@ func TestGetPodsForJobAdopt(t *testing.T) {
} }
func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) { func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) {
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
job1.DeletionTimestamp = &metav1.Time{} job1.DeletionTimestamp = &metav1.Time{}
clientset := fake.NewSimpleClientset(job1) clientset := fake.NewSimpleClientset(job1)
@ -926,7 +1063,7 @@ func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) {
} }
func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) { func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) {
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
// The up-to-date object says it's being deleted. // The up-to-date object says it's being deleted.
job1.DeletionTimestamp = &metav1.Time{} job1.DeletionTimestamp = &metav1.Time{}
@ -965,7 +1102,7 @@ func TestGetPodsForJobRelease(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
@ -994,9 +1131,9 @@ func TestAddPod(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
job2 := newJob(1, 1, 6) job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job2.Name = "job2" job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -1039,11 +1176,11 @@ func TestAddPodOrphan(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
job2 := newJob(1, 1, 6) job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job2.Name = "job2" job2.Name = "job2"
job3 := newJob(1, 1, 6) job3 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job3.Name = "job3" job3.Name = "job3"
job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
@ -1067,9 +1204,9 @@ func TestUpdatePod(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
job2 := newJob(1, 1, 6) job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job2.Name = "job2" job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -1116,9 +1253,9 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
job2 := newJob(1, 1, 6) job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job2.Name = "job2" job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -1143,9 +1280,9 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
job2 := newJob(1, 1, 6) job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job2.Name = "job2" job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -1169,9 +1306,9 @@ func TestUpdatePodRelease(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
job2 := newJob(1, 1, 6) job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job2.Name = "job2" job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -1195,9 +1332,9 @@ func TestDeletePod(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
job2 := newJob(1, 1, 6) job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job2.Name = "job2" job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -1240,11 +1377,11 @@ func TestDeletePodOrphan(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1, 6) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
job2 := newJob(1, 1, 6) job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job2.Name = "job2" job2.Name = "job2"
job3 := newJob(1, 1, 6) job3 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job3.Name = "job3" job3.Name = "job3"
job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
@ -1283,7 +1420,7 @@ func TestSyncJobExpectations(t *testing.T) {
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
manager.updateHandler = func(job *batch.Job) error { return nil } manager.updateHandler = func(job *batch.Job) error { return nil }
job := newJob(2, 2, 6) job := newJob(2, 2, 6, batch.NonIndexedCompletion)
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
pods := newPodList(2, v1.PodPending, job) pods := newPodList(2, v1.PodPending, job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
@ -1351,7 +1488,7 @@ func TestWatchJobs(t *testing.T) {
} }
func TestWatchPods(t *testing.T) { func TestWatchPods(t *testing.T) {
testJob := newJob(2, 2, 6) testJob := newJob(2, 2, 6, batch.NonIndexedCompletion)
clientset := fake.NewSimpleClientset(testJob) clientset := fake.NewSimpleClientset(testJob)
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
@ -1450,7 +1587,7 @@ func TestJobBackoffReset(t *testing.T) {
} }
// job & pods setup // job & pods setup
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
key := testutil.GetKey(job, t) key := testutil.GetKey(job, t)
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
@ -1499,7 +1636,7 @@ func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duratio
} }
func TestJobBackoff(t *testing.T) { func TestJobBackoff(t *testing.T) {
job := newJob(1, 1, 1) job := newJob(1, 1, 1, batch.NonIndexedCompletion)
oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
oldPod.Status.Phase = v1.PodRunning oldPod.Status.Phase = v1.PodRunning
oldPod.ResourceVersion = "1" oldPod.ResourceVersion = "1"
@ -1630,7 +1767,7 @@ func TestJobBackoffForOnFailure(t *testing.T) {
} }
// job & pods setup // job & pods setup
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
@ -1732,7 +1869,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
} }
// job & pods setup // job & pods setup
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyNever job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyNever
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
@ -1769,3 +1906,36 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
}) })
} }
} }
func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) {
t.Helper()
want := []v1.EnvVar{
{
Name: "JOB_COMPLETION_INDEX",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotationAlpha),
},
},
},
}
for _, c := range spec.InitContainers {
if diff := cmp.Diff(want, c.Env); diff != "" {
t.Errorf("Unexpected Env in container %s (-want,+got):\n%s", c.Name, diff)
}
}
for _, c := range spec.Containers {
if diff := cmp.Diff(want, c.Env); diff != "" {
t.Errorf("Unexpected Env in container %s (-want,+got):\n%s", c.Name, diff)
}
}
}
func hasFailingPods(status []indexPhase) bool {
for _, s := range status {
if s.Phase == v1.PodFailed {
return true
}
}
return false
}