Decouple batch/job back-off logic from workqueues (#114768)

* batch/job: decouple backoff from workqueue

Signed-off-by: Sathyanarayanan Saravanamuthu <sathyanarays@vmware.com>

* Resolving review comments

* Resolving more review comments

* Resolving review comments

Signed-off-by: Sathyanarayanan Saravanamuthu <sathyanarays@vmware.com>

* Computing finish time to now when FinishedAt is unix epoch

* Addressing review comments

Signed-off-by: Sathyanarayanan Saravanamuthu <sathyanarays@vmware.com>

---------

Signed-off-by: Sathyanarayanan Saravanamuthu <sathyanarays@vmware.com>
This commit is contained in:
Sathyanarayanan Saravanamuthu 2023-03-16 22:45:21 +05:30 committed by GitHub
parent 856d6d9caa
commit c84c8add70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 917 additions and 136 deletions

View File

@ -0,0 +1,205 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"fmt"
"sort"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/clock"
)
type backoffRecord struct {
key string
failuresAfterLastSuccess int32
lastFailureTime *time.Time
}
type backoffStore struct {
store cache.Store
}
func (s *backoffStore) updateBackoffRecord(record backoffRecord) error {
b, ok, err := s.store.GetByKey(record.key)
if err != nil {
return err
}
if !ok {
err = s.store.Add(&record)
if err != nil {
return err
}
} else {
backoffRecord := b.(*backoffRecord)
backoffRecord.failuresAfterLastSuccess = record.failuresAfterLastSuccess
backoffRecord.lastFailureTime = record.lastFailureTime
}
return nil
}
func (s *backoffStore) removeBackoffRecord(jobId string) error {
b, ok, err := s.store.GetByKey(jobId)
if err != nil {
return err
}
if ok {
err = s.store.Delete(b)
if err != nil {
return err
}
}
return nil
}
func newBackoffRecordStore() *backoffStore {
return &backoffStore{
store: cache.NewStore(backoffRecordKeyFunc),
}
}
var backoffRecordKeyFunc = func(obj interface{}) (string, error) {
if u, ok := obj.(*backoffRecord); ok {
return u.key, nil
}
return "", fmt.Errorf("could not find key for obj %#v", obj)
}
func (backoffRecordStore *backoffStore) newBackoffRecord(clock clock.WithTicker, key string, newSucceededPods []*v1.Pod, newFailedPods []*v1.Pod) backoffRecord {
now := clock.Now()
var backoff *backoffRecord
if b, exists, _ := backoffRecordStore.store.GetByKey(key); exists {
old := b.(*backoffRecord)
backoff = &backoffRecord{
key: old.key,
failuresAfterLastSuccess: old.failuresAfterLastSuccess,
lastFailureTime: old.lastFailureTime,
}
} else {
backoff = &backoffRecord{
key: key,
failuresAfterLastSuccess: 0,
lastFailureTime: nil,
}
}
sortByFinishedTime(newSucceededPods, now)
sortByFinishedTime(newFailedPods, now)
if len(newSucceededPods) == 0 {
if len(newFailedPods) == 0 {
return *backoff
}
backoff.failuresAfterLastSuccess = backoff.failuresAfterLastSuccess + int32(len(newFailedPods))
lastFailureTime := getFinishedTime(newFailedPods[len(newFailedPods)-1], now)
backoff.lastFailureTime = &lastFailureTime
return *backoff
} else {
if len(newFailedPods) == 0 {
backoff.failuresAfterLastSuccess = 0
backoff.lastFailureTime = nil
return *backoff
}
backoff.failuresAfterLastSuccess = 0
backoff.lastFailureTime = nil
lastSuccessTime := getFinishedTime(newSucceededPods[len(newSucceededPods)-1], now)
for i := len(newFailedPods) - 1; i >= 0; i-- {
failedTime := getFinishedTime(newFailedPods[i], now)
if !failedTime.After(lastSuccessTime) {
break
}
if backoff.lastFailureTime == nil {
backoff.lastFailureTime = &failedTime
}
backoff.failuresAfterLastSuccess += 1
}
return *backoff
}
}
func sortByFinishedTime(pods []*v1.Pod, currentTime time.Time) {
sort.Slice(pods, func(i, j int) bool {
p1 := pods[i]
p2 := pods[j]
p1FinishTime := getFinishedTime(p1, currentTime)
p2FinishTime := getFinishedTime(p2, currentTime)
return p1FinishTime.Before(p2FinishTime)
})
}
func getFinishedTime(p *v1.Pod, currentTime time.Time) time.Time {
var finishTime *time.Time
for _, containerState := range p.Status.ContainerStatuses {
if containerState.State.Terminated == nil {
finishTime = nil
break
}
if finishTime == nil {
finishTime = &containerState.State.Terminated.FinishedAt.Time
} else {
if finishTime.Before(containerState.State.Terminated.FinishedAt.Time) {
finishTime = &containerState.State.Terminated.FinishedAt.Time
}
}
}
if finishTime == nil || finishTime.IsZero() {
return currentTime
}
return *finishTime
}
func (backoff backoffRecord) getRemainingTime(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration) time.Duration {
if backoff.failuresAfterLastSuccess == 0 {
return 0
}
backoffDuration := defaultBackoff
for i := 1; i < int(backoff.failuresAfterLastSuccess); i++ {
backoffDuration = backoffDuration * 2
if backoffDuration >= maxBackoff {
backoffDuration = maxBackoff
break
}
}
timeElapsedSinceLastFailure := clock.Since(*backoff.lastFailureTime)
if backoffDuration < timeElapsedSinceLastFailure {
return 0
}
return backoffDuration - timeElapsedSinceLastFailure
}

View File

@ -0,0 +1,395 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clocktesting "k8s.io/utils/clock/testing"
)
func TestNewBackoffRecord(t *testing.T) {
emptyStoreInitializer := func(*backoffStore) {}
defaultTestTime := metav1.NewTime(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC))
testCases := map[string]struct {
storeInitializer func(*backoffStore)
uncounted uncountedTerminatedPods
newSucceededPods []metav1.Time
newFailedPods []metav1.Time
wantBackoffRecord backoffRecord
}{
"Empty backoff store and one new failure": {
storeInitializer: emptyStoreInitializer,
newSucceededPods: []metav1.Time{},
newFailedPods: []metav1.Time{
defaultTestTime,
},
wantBackoffRecord: backoffRecord{
key: "key",
lastFailureTime: &defaultTestTime.Time,
failuresAfterLastSuccess: 1,
},
},
"Empty backoff store and two new failures": {
storeInitializer: emptyStoreInitializer,
newSucceededPods: []metav1.Time{},
newFailedPods: []metav1.Time{
defaultTestTime,
metav1.NewTime(defaultTestTime.Add(-1 * time.Millisecond)),
},
wantBackoffRecord: backoffRecord{
key: "key",
lastFailureTime: &defaultTestTime.Time,
failuresAfterLastSuccess: 2,
},
},
"Empty backoff store, two failures followed by success": {
storeInitializer: emptyStoreInitializer,
newSucceededPods: []metav1.Time{
defaultTestTime,
},
newFailedPods: []metav1.Time{
metav1.NewTime(defaultTestTime.Add(-2 * time.Millisecond)),
metav1.NewTime(defaultTestTime.Add(-1 * time.Millisecond)),
},
wantBackoffRecord: backoffRecord{
key: "key",
failuresAfterLastSuccess: 0,
},
},
"Empty backoff store, two failures, one success and two more failures": {
storeInitializer: emptyStoreInitializer,
newSucceededPods: []metav1.Time{
metav1.NewTime(defaultTestTime.Add(-2 * time.Millisecond)),
},
newFailedPods: []metav1.Time{
defaultTestTime,
metav1.NewTime(defaultTestTime.Add(-4 * time.Millisecond)),
metav1.NewTime(defaultTestTime.Add(-3 * time.Millisecond)),
metav1.NewTime(defaultTestTime.Add(-1 * time.Millisecond)),
},
wantBackoffRecord: backoffRecord{
key: "key",
lastFailureTime: &defaultTestTime.Time,
failuresAfterLastSuccess: 2,
},
},
"Backoff store having failure count 2 and one new failure": {
storeInitializer: func(bis *backoffStore) {
bis.updateBackoffRecord(backoffRecord{
key: "key",
failuresAfterLastSuccess: 2,
lastFailureTime: nil,
})
},
newSucceededPods: []metav1.Time{},
newFailedPods: []metav1.Time{
defaultTestTime,
},
wantBackoffRecord: backoffRecord{
key: "key",
lastFailureTime: &defaultTestTime.Time,
failuresAfterLastSuccess: 3,
},
},
"Empty backoff store with success and failure at same timestamp": {
storeInitializer: emptyStoreInitializer,
newSucceededPods: []metav1.Time{
defaultTestTime,
},
newFailedPods: []metav1.Time{
defaultTestTime,
},
wantBackoffRecord: backoffRecord{
key: "key",
failuresAfterLastSuccess: 0,
},
},
"Empty backoff store with no success/failure": {
storeInitializer: emptyStoreInitializer,
newSucceededPods: []metav1.Time{},
newFailedPods: []metav1.Time{},
wantBackoffRecord: backoffRecord{
key: "key",
failuresAfterLastSuccess: 0,
},
},
"Empty backoff store with one success": {
storeInitializer: emptyStoreInitializer,
newSucceededPods: []metav1.Time{
defaultTestTime,
},
newFailedPods: []metav1.Time{},
wantBackoffRecord: backoffRecord{
key: "key",
failuresAfterLastSuccess: 0,
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
backoffRecordStore := newBackoffRecordStore()
tc.storeInitializer(backoffRecordStore)
newSucceededPods := []*v1.Pod{}
newFailedPods := []*v1.Pod{}
for _, finishTime := range tc.newSucceededPods {
newSucceededPods = append(newSucceededPods, &v1.Pod{
ObjectMeta: metav1.ObjectMeta{},
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
FinishedAt: finishTime,
},
},
},
},
},
})
}
for _, finishTime := range tc.newFailedPods {
newFailedPods = append(newFailedPods, &v1.Pod{
ObjectMeta: metav1.ObjectMeta{},
Status: v1.PodStatus{
Phase: v1.PodFailed,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
FinishedAt: finishTime,
},
},
},
},
},
})
}
fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second))
backoffRecord := backoffRecordStore.newBackoffRecord(fakeClock, "key", newSucceededPods, newFailedPods)
if diff := cmp.Diff(tc.wantBackoffRecord, backoffRecord, cmp.AllowUnexported(backoffRecord)); diff != "" {
t.Errorf("backoffRecord not matching; (-want,+got): %v", diff)
}
})
}
}
func TestGetFinishedTime(t *testing.T) {
defaultTestTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
testCases := map[string]struct {
pod v1.Pod
wantFinishTime time.Time
}{
"Pod with multiple containers and all containers terminated": {
pod: v1.Pod{
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{FinishedAt: metav1.NewTime(defaultTestTime.Add(-1 * time.Second))},
},
},
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{FinishedAt: metav1.NewTime(defaultTestTime)},
},
},
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{FinishedAt: metav1.NewTime(defaultTestTime.Add(-2 * time.Second))},
},
},
},
},
},
wantFinishTime: defaultTestTime,
},
"Pod with multiple containers; two containers in terminated state and one in running state": {
pod: v1.Pod{
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{FinishedAt: metav1.NewTime(defaultTestTime.Add(-1 * time.Second))},
},
},
{
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
},
},
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{FinishedAt: metav1.NewTime(defaultTestTime.Add(-2 * time.Second))},
},
},
},
},
},
wantFinishTime: defaultTestTime,
},
"Pod with single container in running state": {
pod: v1.Pod{
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
},
},
},
},
},
wantFinishTime: defaultTestTime,
},
"Pod with single container with zero finish time": {
pod: v1.Pod{
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{},
},
},
},
},
},
wantFinishTime: defaultTestTime,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
f := getFinishedTime(&tc.pod, defaultTestTime)
if !f.Equal(tc.wantFinishTime) {
t.Errorf("Expected value of finishedTime %v; got %v", tc.wantFinishTime, f)
}
})
}
}
func TestGetRemainingBackoffTime(t *testing.T) {
defaultTestTime := metav1.NewTime(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC))
testCases := map[string]struct {
backoffRecord backoffRecord
currentTime time.Time
maxBackoff time.Duration
defaultBackoff time.Duration
wantDuration time.Duration
}{
"no failures": {
backoffRecord: backoffRecord{
lastFailureTime: nil,
failuresAfterLastSuccess: 0,
},
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 0 * time.Second,
},
"one failure; current time and failure time are same": {
backoffRecord: backoffRecord{
lastFailureTime: &defaultTestTime.Time,
failuresAfterLastSuccess: 1,
},
currentTime: defaultTestTime.Time,
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 5 * time.Second,
},
"one failure; current time == 1 second + failure time": {
backoffRecord: backoffRecord{
lastFailureTime: &defaultTestTime.Time,
failuresAfterLastSuccess: 1,
},
currentTime: defaultTestTime.Time.Add(time.Second),
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 4 * time.Second,
},
"one failure; current time == expected backoff time": {
backoffRecord: backoffRecord{
lastFailureTime: &defaultTestTime.Time,
failuresAfterLastSuccess: 1,
},
currentTime: defaultTestTime.Time.Add(5 * time.Second),
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 0 * time.Second,
},
"one failure; current time == expected backoff time + 1 Second": {
backoffRecord: backoffRecord{
lastFailureTime: &defaultTestTime.Time,
failuresAfterLastSuccess: 1,
},
currentTime: defaultTestTime.Time.Add(6 * time.Second),
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 0 * time.Second,
},
"three failures; current time and failure time are same": {
backoffRecord: backoffRecord{
lastFailureTime: &defaultTestTime.Time,
failuresAfterLastSuccess: 3,
},
currentTime: defaultTestTime.Time,
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 20 * time.Second,
},
"eight failures; current time and failure time are same; backoff not exceeding maxBackoff": {
backoffRecord: backoffRecord{
lastFailureTime: &defaultTestTime.Time,
failuresAfterLastSuccess: 8,
},
currentTime: defaultTestTime.Time,
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 640 * time.Second,
},
"nine failures; current time and failure time are same; backoff exceeding maxBackoff": {
backoffRecord: backoffRecord{
lastFailureTime: &defaultTestTime.Time,
failuresAfterLastSuccess: 9,
},
currentTime: defaultTestTime.Time,
defaultBackoff: 5 * time.Second,
maxBackoff: 700 * time.Second,
wantDuration: 700 * time.Second,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
fakeClock := clocktesting.NewFakeClock(tc.currentTime.Truncate(time.Second))
d := tc.backoffRecord.getRemainingTime(fakeClock, tc.defaultBackoff, tc.maxBackoff)
if d.Seconds() != tc.wantDuration.Seconds() {
t.Errorf("Expected value of duration %v; got %v", tc.wantDuration, d)
}
})
}
}

View File

@ -93,7 +93,7 @@ type Controller struct {
// To allow injection of the following for testing.
updateStatusHandler func(ctx context.Context, job *batch.Job) (*batch.Job, error)
patchJobHandler func(ctx context.Context, job *batch.Job, patch []byte) error
syncHandler func(ctx context.Context, jobKey string) (bool, error)
syncHandler func(ctx context.Context, jobKey string) error
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced cache.InformerSynced
@ -126,6 +126,8 @@ type Controller struct {
podUpdateBatchPeriod time.Duration
clock clock.WithTicker
backoffRecordStore *backoffStore
}
// NewController creates a new Job controller that keeps the relevant pods
@ -150,6 +152,7 @@ func newControllerWithClock(podInformer coreinformers.PodInformer, jobInformer b
broadcaster: eventBroadcaster,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
clock: clock,
backoffRecordStore: newBackoffRecordStore(),
}
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
jm.podUpdateBatchPeriod = podUpdateBatchPeriod
@ -544,11 +547,9 @@ func (jm *Controller) processNextWorkItem(ctx context.Context) bool {
}
defer jm.queue.Done(key)
forget, err := jm.syncHandler(ctx, key.(string))
err := jm.syncHandler(ctx, key.(string))
if err == nil {
if forget {
jm.queue.Forget(key)
}
jm.queue.Forget(key)
return true
}
@ -669,7 +670,7 @@ func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Po
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rErr error) {
func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
startTime := jm.clock.Now()
defer func() {
klog.V(4).Infof("Finished syncing job %q (%v)", key, jm.clock.Since(startTime))
@ -677,10 +678,10 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return false, err
return err
}
if len(ns) == 0 || len(name) == 0 {
return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
return fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
}
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
if err != nil {
@ -688,21 +689,32 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
klog.V(4).Infof("Job has been deleted: %v", key)
jm.expectations.DeleteExpectations(key)
jm.finalizerExpectations.deleteExpectations(key)
return true, nil
err := jm.backoffRecordStore.removeBackoffRecord(key)
if err != nil {
// re-syncing here as the record has to be removed for finished/deleted jobs
return fmt.Errorf("error removing backoff record %w", err)
}
return nil
}
return false, err
return err
}
// make a copy so we don't mutate the shared cache
job := *sharedJob.DeepCopy()
// if job was finished previously, we don't want to redo the termination
if IsJobFinished(&job) {
return true, nil
err := jm.backoffRecordStore.removeBackoffRecord(key)
if err != nil {
// re-syncing here as the record has to be removed for finished/deleted jobs
return fmt.Errorf("error removing backoff record %w", err)
}
return nil
}
if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode != batch.NonIndexedCompletion && *job.Spec.CompletionMode != batch.IndexedCompletion {
jm.recorder.Event(&job, v1.EventTypeWarning, "UnknownCompletionMode", "Skipped Job sync because completion mode is unknown")
return false, nil
return nil
}
completionMode := getCompletionMode(&job)
@ -731,12 +743,14 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
pods, err := jm.getPodsForJob(ctx, &job)
if err != nil {
return false, err
return err
}
activePods := controller.FilterActivePods(pods)
active := int32(len(activePods))
succeeded, failed := getStatus(&job, pods, uncounted, expectedRmFinalizers)
newSucceededPods, newFailedPods := getNewFinishedPods(&job, pods, uncounted, expectedRmFinalizers)
succeeded := job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(uncounted.succeeded))
failed := job.Status.Failed + int32(len(newFailedPods)) + int32(len(uncounted.failed))
var ready *int32
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
ready = pointer.Int32(countReadyPods(activePods))
@ -748,6 +762,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
job.Status.StartTime = &now
}
newBackoffInfo := jm.backoffRecordStore.newBackoffRecord(jm.clock, key, newSucceededPods, newFailedPods)
var manageJobErr error
var finishedCondition *batch.JobCondition
@ -799,7 +815,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
} else {
manageJobCalled := false
if satisfiedExpectations && job.DeletionTimestamp == nil {
active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes)
active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes, newBackoffInfo)
manageJobCalled = true
}
complete := false
@ -852,34 +868,27 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
}
}
// Check if the number of jobs succeeded increased since the last check. If yes "forget" should be true
// This logic is linked to the issue: https://github.com/kubernetes/kubernetes/issues/56853 that aims to
// improve the Job backoff policy when parallelism > 1 and few Jobs failed but others succeed.
// In this case, we should clear the backoff delay.
forget = job.Status.Succeeded < succeeded
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready)
job.Status.Active = active
job.Status.Ready = ready
err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate)
err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate, newBackoffInfo)
if err != nil {
if apierrors.IsConflict(err) {
// we probably have a stale informer cache
// so don't return an error to avoid backoff
jm.enqueueController(&job, false)
return false, nil
return nil
}
return false, fmt.Errorf("tracking status: %w", err)
return fmt.Errorf("tracking status: %w", err)
}
jobFinished := IsJobFinished(&job)
if jobHasNewFailure && !jobFinished {
// returning an error will re-enqueue Job after the backoff period
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
return fmt.Errorf("failed pod(s) detected for job key %q", key)
}
if suspendCondChanged {
forget = true
}
return forget, manageJobErr
return manageJobErr
}
// deleteActivePods issues deletion for active Pods, preserving finalizers.
@ -952,7 +961,7 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey
//
// It does this up to a limited number of Pods so that the size of .status
// doesn't grow too much and this sync doesn't starve other Jobs.
func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.String, finishedCond *batch.JobCondition, needsFlush bool) error {
func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.String, finishedCond *batch.JobCondition, needsFlush bool, newBackoffRecord backoffRecord) error {
isIndexed := isIndexedJob(job)
var podsToRemoveFinalizer []*v1.Pod
uncountedStatus := job.Status.UncountedTerminatedPods
@ -1064,7 +1073,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
}
}
var err error
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush); err != nil {
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush, newBackoffRecord); err != nil {
return err
}
jobFinished := jm.enactJobFinished(job, finishedCond)
@ -1094,12 +1103,22 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
//
// Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) {
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool, newBackoffRecord backoffRecord) (*batch.Job, bool, error) {
var err error
if needsFlush {
if job, err = jm.updateStatusHandler(ctx, job); err != nil {
return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
}
err = jm.backoffRecordStore.updateBackoffRecord(newBackoffRecord)
if err != nil {
// this error might undercount the backoff.
// re-syncing from the current state might not help to recover
// the backoff information
klog.ErrorS(err, "Backoff update failed")
}
recordJobPodFinished(job, *oldCounters)
// Shallow copy, as it will only be used to detect changes in the counters.
*oldCounters = job.Status
@ -1316,15 +1335,13 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *s
return nil
}
// getStatus returns number of succeeded and failed pods running a job. The number
// of failed pods can be affected by the podFailurePolicy.
func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.String) (succeeded, failed int32) {
succeeded = job.Status.Succeeded
failed = job.Status.Failed
succeeded += int32(countValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool {
// getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted
// in the job status. The list of failed pods can be affected by the podFailurePolicy.
func getNewFinishedPods(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.String) (succeededPods, failedPods []*v1.Pod) {
succeededPods = getValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool {
return p.Status.Phase == v1.PodSucceeded
}))
failed += int32(countValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool {
})
failedPods = getValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool {
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
if !isPodFailed(p, job) {
return false
@ -1334,8 +1351,8 @@ func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPod
} else {
return isPodFailed(p, job)
}
}))
return succeeded, failed
})
return succeededPods, failedPods
}
// jobSuspended returns whether a Job is suspended while taking the feature
@ -1346,8 +1363,9 @@ func jobSuspended(job *batch.Job) bool {
// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Respects back-off; does not create new pods if the back-off time has not passed
// Does NOT modify <activePods>.
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval) (int32, string, error) {
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, backoff backoffRecord) (int32, string, error) {
active := int32(len(activePods))
parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job)
@ -1408,6 +1426,11 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
}
if active < wantActive {
remainingTime := backoff.getRemainingTime(jm.clock, DefaultJobBackOff, MaxJobBackOff)
if remainingTime > 0 {
jm.enqueueControllerDelayed(job, true, remainingTime)
return 0, metrics.JobSyncActionPodsCreated, nil
}
diff := wantActive - active
if diff > int32(MaxPodCreateDeletePerSync) {
diff = int32(MaxPodCreateDeletePerSync)
@ -1554,13 +1577,14 @@ func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Dur
return calculated
}
// countValidPodsWithFilter returns number of valid pods that pass the filter.
// Pods are valid if they have a finalizer and, for Indexed Jobs, a valid
// completion index.
func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.String, expectedRmFinalizers sets.String, filter func(*v1.Pod) bool) int {
result := len(uncounted)
// getValidPodsWithFilter returns the valid pods that pass the filter.
// Pods are valid if they have a finalizer or in uncounted set
// and, for Indexed Jobs, a valid completion index.
func getValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.String, expectedRmFinalizers sets.String, filter func(*v1.Pod) bool) []*v1.Pod {
var result []*v1.Pod
for _, p := range pods {
uid := string(p.UID)
// Pods that don't have a completion finalizer are in the uncounted set or
// have already been accounted for in the Job status.
if !hasJobTrackingFinalizer(p) || uncounted.Has(uid) || expectedRmFinalizers.Has(uid) {
@ -1573,7 +1597,7 @@ func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.Str
}
}
if filter(p) {
result++
result = append(result, p)
}
}
return result

View File

@ -62,6 +62,10 @@ import (
var realClock = &clock.RealClock{}
var alwaysReady = func() bool { return true }
// testFinishedAt represents time one second later than unix epoch
// this will be used in various test cases where we don't want back-off to kick in
var testFinishedAt = metav1.NewTime((time.Time{}).Add(time.Second))
func newJobWithName(name string, parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job {
j := &batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
@ -141,6 +145,15 @@ func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod {
for i := 0; i < count; i++ {
newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
newPod.Status = v1.PodStatus{Phase: status}
newPod.Status.ContainerStatuses = []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
FinishedAt: testFinishedAt,
},
},
},
}
newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer)
pods = append(pods, newPod)
}
@ -176,6 +189,17 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status
for _, s := range status {
p := newPod(fmt.Sprintf("pod-%s", rand.String(10)), job)
p.Status = v1.PodStatus{Phase: s.Phase}
if s.Phase == v1.PodFailed || s.Phase == v1.PodSucceeded {
p.Status.ContainerStatuses = []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
FinishedAt: testFinishedAt,
},
},
},
}
}
if s.Index != noIndex {
p.Annotations = map[string]string{
batch.JobCompletionIndexAnnotation: s.Index,
@ -188,15 +212,17 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status
}
type jobInitialStatus struct {
active int
succeed int
failed int
active int
succeed int
failed int
startTime *time.Time
}
func TestControllerSyncJob(t *testing.T) {
jobConditionComplete := batch.JobComplete
jobConditionFailed := batch.JobFailed
jobConditionSuspended := batch.JobSuspended
referenceTime := time.Now()
testCases := map[string]struct {
// job setup
@ -209,6 +235,8 @@ func TestControllerSyncJob(t *testing.T) {
wasSuspended bool
suspend bool
initialStatus *jobInitialStatus
backoffRecord *backoffRecord
controllerTime *time.Time
// pod setup
@ -216,7 +244,6 @@ func TestControllerSyncJob(t *testing.T) {
// This means that there is no status update so the counters for
// failedPods and succeededPods cannot be incremented.
podControllerError error
jobKeyForget bool
pendingPods int
activePods int
readyPods int
@ -246,7 +273,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: 5,
backoffLimit: 6,
jobKeyForget: false,
expectedCreations: 2,
expectedActive: 2,
},
@ -254,7 +280,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: -1,
backoffLimit: 6,
jobKeyForget: false,
expectedCreations: 2,
expectedActive: 2,
},
@ -262,7 +287,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: 5,
backoffLimit: 6,
jobKeyForget: false,
pendingPods: 2,
expectedActive: 2,
},
@ -270,7 +294,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 3,
completions: 5,
backoffLimit: 6,
jobKeyForget: false,
activePods: 3,
readyPods: 2,
expectedActive: 3,
@ -279,7 +302,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 3,
completions: 5,
backoffLimit: 6,
jobKeyForget: false,
activePods: 3,
readyPods: 2,
expectedActive: 3,
@ -290,7 +312,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: -1,
backoffLimit: 6,
jobKeyForget: false,
activePods: 2,
expectedActive: 2,
},
@ -298,7 +319,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: 5,
backoffLimit: 6,
jobKeyForget: true,
activePods: 1,
succeededPods: 1,
expectedCreations: 1,
@ -306,11 +326,48 @@ func TestControllerSyncJob(t *testing.T) {
expectedSucceeded: 1,
expectedPodPatches: 1,
},
"too few active pods and active back-off": {
parallelism: 1,
completions: 1,
backoffLimit: 6,
backoffRecord: &backoffRecord{
failuresAfterLastSuccess: 1,
lastFailureTime: &referenceTime,
},
initialStatus: &jobInitialStatus{
startTime: func() *time.Time {
now := time.Now()
return &now
}(),
},
activePods: 0,
succeededPods: 0,
expectedCreations: 0,
expectedActive: 0,
expectedSucceeded: 0,
expectedPodPatches: 0,
controllerTime: &referenceTime,
},
"too few active pods and no back-offs": {
parallelism: 1,
completions: 1,
backoffLimit: 6,
backoffRecord: &backoffRecord{
failuresAfterLastSuccess: 0,
lastFailureTime: &referenceTime,
},
activePods: 0,
succeededPods: 0,
expectedCreations: 1,
expectedActive: 1,
expectedSucceeded: 0,
expectedPodPatches: 0,
controllerTime: &referenceTime,
},
"too few active pods with a dynamic job": {
parallelism: 2,
completions: -1,
backoffLimit: 6,
jobKeyForget: false,
activePods: 1,
expectedCreations: 1,
expectedActive: 2,
@ -320,7 +377,6 @@ func TestControllerSyncJob(t *testing.T) {
completions: 5,
backoffLimit: 6,
podControllerError: fmt.Errorf("fake error"),
jobKeyForget: false,
activePods: 1,
succeededPods: 1,
expectedCreations: 1,
@ -332,7 +388,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: 5,
backoffLimit: 6,
jobKeyForget: false,
activePods: 3,
expectedDeletions: 1,
expectedActive: 2,
@ -343,7 +398,6 @@ func TestControllerSyncJob(t *testing.T) {
completions: 5,
backoffLimit: 6,
podControllerError: fmt.Errorf("fake error"),
jobKeyForget: false,
activePods: 3,
expectedDeletions: 0,
expectedPodPatches: 1,
@ -353,7 +407,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: 5,
backoffLimit: 6,
jobKeyForget: true,
activePods: 1,
succeededPods: 1,
failedPods: 1,
@ -375,10 +428,14 @@ func TestControllerSyncJob(t *testing.T) {
expectedPodPatches: 1,
},
"no new pod; possible finalizer update of failed pod": {
parallelism: 1,
completions: 1,
backoffLimit: 6,
initialStatus: &jobInitialStatus{1, 0, 1},
parallelism: 1,
completions: 1,
backoffLimit: 6,
initialStatus: &jobInitialStatus{
active: 1,
succeed: 0,
failed: 1,
},
activePods: 1,
failedPods: 0,
expectedCreations: 0,
@ -402,7 +459,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: 5,
backoffLimit: 6,
jobKeyForget: true,
succeededPods: 5,
expectedSucceeded: 5,
expectedCondition: &jobConditionComplete,
@ -413,7 +469,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: -1,
backoffLimit: 6,
jobKeyForget: true,
activePods: 1,
succeededPods: 1,
expectedActive: 1,
@ -424,7 +479,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: -1,
backoffLimit: 6,
jobKeyForget: true,
succeededPods: 2,
expectedSucceeded: 2,
expectedCondition: &jobConditionComplete,
@ -435,7 +489,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: -1,
backoffLimit: 6,
jobKeyForget: true,
succeededPods: 1,
failedPods: 1,
expectedSucceeded: 1,
@ -448,7 +501,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: 5,
backoffLimit: 6,
jobKeyForget: false,
activePods: 10,
expectedDeletions: 8,
expectedActive: 2,
@ -458,7 +510,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 3,
completions: 4,
backoffLimit: 6,
jobKeyForget: true,
activePods: 3,
succeededPods: 2,
expectedDeletions: 1,
@ -470,7 +521,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: 5,
backoffLimit: 6,
jobKeyForget: true,
activePods: 2,
succeededPods: 2,
expectedActive: 2,
@ -482,7 +532,6 @@ func TestControllerSyncJob(t *testing.T) {
completions: 5,
backoffLimit: 6,
deleting: true,
jobKeyForget: true,
pendingPods: 1,
activePods: 1,
succeededPods: 1,
@ -495,7 +544,6 @@ func TestControllerSyncJob(t *testing.T) {
completions: 200,
backoffLimit: 6,
podLimit: 10,
jobKeyForget: false,
expectedCreations: 10,
expectedActive: 10,
},
@ -503,7 +551,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: 5,
deleting: true,
jobKeyForget: false,
failedPods: 1,
expectedFailed: 1,
expectedCondition: &jobConditionFailed,
@ -525,7 +572,6 @@ func TestControllerSyncJob(t *testing.T) {
completions: 5,
backoffLimit: 6,
completionMode: batch.IndexedCompletion,
jobKeyForget: false,
expectedCreations: 2,
expectedActive: 2,
expectedCreatedIndexes: sets.NewInt(0, 1),
@ -535,7 +581,6 @@ func TestControllerSyncJob(t *testing.T) {
completions: 3,
backoffLimit: 6,
completionMode: batch.IndexedCompletion,
jobKeyForget: true,
podsWithIndexes: []indexPhase{
{"0", v1.PodSucceeded},
{"1", v1.PodFailed},
@ -554,7 +599,6 @@ func TestControllerSyncJob(t *testing.T) {
completions: 3,
backoffLimit: 6,
completionMode: batch.IndexedCompletion,
jobKeyForget: true,
podsWithIndexes: []indexPhase{
{"0", v1.PodSucceeded},
{"1", v1.PodSucceeded},
@ -582,7 +626,6 @@ func TestControllerSyncJob(t *testing.T) {
{"8", v1.PodSucceeded},
{"9", v1.PodSucceeded},
},
jobKeyForget: true,
expectedCreations: 6,
expectedActive: 8,
expectedSucceeded: 6,
@ -623,7 +666,6 @@ func TestControllerSyncJob(t *testing.T) {
{"1", v1.PodRunning},
{"2", v1.PodRunning},
},
jobKeyForget: true,
expectedDeletions: 3,
expectedActive: 2,
expectedSucceeded: 1,
@ -645,7 +687,6 @@ func TestControllerSyncJob(t *testing.T) {
{"2", v1.PodRunning},
{"2", v1.PodPending},
},
jobKeyForget: true,
expectedCreations: 0,
expectedDeletions: 2,
expectedActive: 2,
@ -665,7 +706,6 @@ func TestControllerSyncJob(t *testing.T) {
{"7", v1.PodPending},
{"8", v1.PodFailed},
},
jobKeyForget: true,
expectedCreations: 0, // only one of creations and deletions can happen in a sync
expectedSucceeded: 1,
expectedDeletions: 2,
@ -682,7 +722,6 @@ func TestControllerSyncJob(t *testing.T) {
activePods: 2, // parallelism == active, expectations satisfied
completions: 4,
backoffLimit: 6,
jobKeyForget: true,
expectedCreations: 0,
expectedDeletions: 2,
expectedActive: 0,
@ -702,7 +741,6 @@ func TestControllerSyncJob(t *testing.T) {
fakeExpectationAtCreation: -1, // the controller is expecting a deletion
completions: 4,
backoffLimit: 6,
jobKeyForget: false,
expectedCreations: 0,
expectedDeletions: 0,
expectedActive: 3,
@ -713,7 +751,6 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: 4,
backoffLimit: 6,
jobKeyForget: true,
expectedCreations: 2,
expectedDeletions: 0,
expectedActive: 2,
@ -732,7 +769,6 @@ func TestControllerSyncJob(t *testing.T) {
activePods: 2, // parallelism == active, expectations satisfied
completions: 4,
backoffLimit: 6,
jobKeyForget: false,
expectedCreations: 0,
expectedDeletions: 0,
expectedActive: 2,
@ -746,7 +782,15 @@ func TestControllerSyncJob(t *testing.T) {
// job manager setup
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
var fakeClock clock.WithTicker
if tc.controllerTime != nil {
fakeClock = clocktesting.NewFakeClock(*tc.controllerTime)
} else {
fakeClock = clocktesting.NewFakeClock(time.Now())
}
manager, sharedInformerFactory := newControllerFromClientWithClock(clientSet, controller.NoResyncPeriodFunc, fakeClock)
fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -761,11 +805,21 @@ func TestControllerSyncJob(t *testing.T) {
job.Status.Active = int32(tc.initialStatus.active)
job.Status.Succeeded = int32(tc.initialStatus.succeed)
job.Status.Failed = int32(tc.initialStatus.failed)
if tc.initialStatus.startTime != nil {
startTime := metav1.NewTime(*tc.initialStatus.startTime)
job.Status.StartTime = &startTime
}
}
key, err := controller.KeyFunc(job)
if err != nil {
t.Errorf("Unexpected error getting job key: %v", err)
}
if tc.backoffRecord != nil {
tc.backoffRecord.key = key
manager.backoffRecordStore.updateBackoffRecord(*tc.backoffRecord)
}
if tc.fakeExpectationAtCreation < 0 {
manager.expectations.ExpectDeletions(key, int(-tc.fakeExpectationAtCreation))
} else if tc.fakeExpectationAtCreation > 0 {
@ -790,7 +844,7 @@ func TestControllerSyncJob(t *testing.T) {
}
// run
forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
err = manager.syncJob(context.TODO(), testutil.GetKey(job, t))
// We need requeue syncJob task if podController error
if tc.podControllerError != nil {
@ -808,9 +862,6 @@ func TestControllerSyncJob(t *testing.T) {
} else if err != nil {
t.Errorf("Unexpected error when syncing jobs: %v", err)
}
if forget != tc.jobKeyForget {
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget)
}
// validate created/deleted pods
if int32(len(fakePodControl.Templates)) != tc.expectedCreations {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates))
@ -928,7 +979,7 @@ func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantI
}
}
func TestGetStatus(t *testing.T) {
func TestGetNewFinshedPods(t *testing.T) {
cases := map[string]struct {
job batch.Job
pods []*v1.Pod
@ -1025,7 +1076,9 @@ func TestGetStatus(t *testing.T) {
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
succeeded, failed := getStatus(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers)
succeededPods, failedPods := getNewFinishedPods(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers)
succeeded := int32(len(succeededPods)) + tc.job.Status.Succeeded + int32(len(uncounted.succeeded))
failed := int32(len(failedPods)) + tc.job.Status.Failed + int32(len(uncounted.failed))
if succeeded != tc.wantSucceeded {
t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded)
}
@ -1597,7 +1650,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
if isIndexedJob(job) {
succeededIndexes = succeededIndexesFromString(job.Status.CompletedIndexes, int(*job.Spec.Completions))
}
err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush)
err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush, backoffRecord{})
if !errors.Is(err, tc.wantErr) {
t.Errorf("Got error %v, want %v", err, tc.wantErr)
}
@ -1742,13 +1795,10 @@ func TestSyncJobPastDeadline(t *testing.T) {
setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0)
// run
forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err)
}
if forget != tc.expectedForGetKey {
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", tc.expectedForGetKey, forget)
}
// validate created/deleted pods
if int32(len(fakePodControl.Templates)) != 0 {
t.Errorf("Unexpected number of creates. Expected 0, saw %d\n", len(fakePodControl.Templates))
@ -1903,13 +1953,10 @@ func TestSingleJobFailedCondition(t *testing.T) {
job.Status.StartTime = &start
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobFailed, v1.ConditionFalse, "DeadlineExceeded", "Job was active longer than specified deadline", realClock.Now()))
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err)
}
if forget {
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", false, forget)
}
if len(fakePodControl.DeletePodName) != 0 {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
}
@ -1937,13 +1984,10 @@ func TestSyncJobComplete(t *testing.T) {
job := newJob(1, 1, 6, batch.NonIndexedCompletion)
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobComplete, v1.ConditionTrue, "", "", realClock.Now()))
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if err != nil {
t.Fatalf("Unexpected error when syncing jobs %v", err)
}
if !forget {
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
}
actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name)
if err != nil {
t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
@ -1965,13 +2009,10 @@ func TestSyncJobDeleted(t *testing.T) {
return job, nil
}
job := newJob(2, 2, 6, batch.NonIndexedCompletion)
forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err)
}
if !forget {
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
}
if len(fakePodControl.Templates) != 0 {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
}
@ -2081,7 +2122,8 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
Name: "main-container",
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 42,
ExitCode: 42,
FinishedAt: testFinishedAt,
},
},
},
@ -2312,7 +2354,8 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
Name: "main-container",
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 5,
ExitCode: 5,
FinishedAt: testFinishedAt,
},
},
},
@ -2507,7 +2550,8 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
Name: "main-container",
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 5,
ExitCode: 5,
FinishedAt: testFinishedAt,
},
},
},
@ -2741,7 +2785,8 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 2,
ExitCode: 2,
FinishedAt: testFinishedAt,
},
},
},
@ -2803,6 +2848,15 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
Status: v1.ConditionTrue,
},
},
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
FinishedAt: testFinishedAt,
},
},
},
},
},
},
},
@ -3627,7 +3681,7 @@ func TestWatchJobs(t *testing.T) {
// The update sent through the fakeWatcher should make its way into the workqueue,
// and eventually into the syncHandler.
manager.syncHandler = func(ctx context.Context, key string) (bool, error) {
manager.syncHandler = func(ctx context.Context, key string) error {
defer close(received)
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
@ -3636,12 +3690,12 @@ func TestWatchJobs(t *testing.T) {
job, err := manager.jobLister.Jobs(ns).Get(name)
if err != nil || job == nil {
t.Errorf("Expected to find job under key %v: %v", key, err)
return true, nil
return nil
}
if !apiequality.Semantic.DeepDerivative(*job, testJob) {
t.Errorf("Expected %#v, but got %#v", testJob, *job)
}
return true, nil
return nil
}
// Start only the job watcher and the workqueue, send a watch event,
// and make sure it hits the sync method.
@ -3672,7 +3726,7 @@ func TestWatchPods(t *testing.T) {
received := make(chan struct{})
// The pod update sent through the fakeWatcher should figure out the managing job and
// send it into the syncHandler.
manager.syncHandler = func(ctx context.Context, key string) (bool, error) {
manager.syncHandler = func(ctx context.Context, key string) error {
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
t.Errorf("Error getting namespace/name from key %v: %v", key, err)
@ -3684,10 +3738,10 @@ func TestWatchPods(t *testing.T) {
if !apiequality.Semantic.DeepDerivative(job, testJob) {
t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job)
close(received)
return true, nil
return nil
}
close(received)
return true, nil
return nil
}
// Start only the pod watcher and the workqueue, send a watch event,
// and make sure it hits the sync method for the right job.
@ -4080,14 +4134,11 @@ func TestJobBackoffForOnFailure(t *testing.T) {
}
// run
forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if err != nil {
t.Errorf("unexpected error syncing job. Got %#v", err)
}
if forget != tc.jobKeyForget {
t.Errorf("unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget)
}
// validate status
if actual.Status.Active != tc.expectedActive {
t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active)
@ -4177,6 +4228,9 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
for _, pod := range newPodList(tc.failedPods, v1.PodFailed, job) {
pod.Status.ContainerStatuses = []v1.ContainerStatus{{State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{
FinishedAt: testFinishedAt,
}}}}
podIndexer.Add(pod)
}
for _, pod := range newPodList(tc.activePods, tc.activePodsPhase, job) {
@ -4184,14 +4238,11 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
}
// run
forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if (err != nil) != tc.isExpectingAnError {
t.Errorf("unexpected error syncing job. Got %#v, isExpectingAnError: %v\n", err, tc.isExpectingAnError)
}
if forget != tc.jobKeyForget {
t.Errorf("unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget)
}
// validate status
if actual.Status.Active != tc.expectedActive {
t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active)

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"sync"
@ -399,7 +400,9 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi
Ready: pointer.Int32(0),
})
jobPods, err := getJobPods(ctx, t, cs, jobObj)
jobPods, err := getJobPods(ctx, t, cs, jobObj, func(s v1.PodStatus) bool {
return (s.Phase == v1.PodPending || s.Phase == v1.PodRunning)
})
if err != nil {
t.Fatalf("Failed to list Job Pods: %v", err)
}
@ -543,6 +546,13 @@ func TestJobPodFailurePolicy(t *testing.T) {
}
podStatusNotMatchingAnyRule := v1.PodStatus{
Phase: v1.PodFailed,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{},
},
},
},
}
testCases := map[string]struct {
enableJobPodFailurePolicy bool
@ -1367,6 +1377,92 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
}
func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
// overwrite the default value for faster testing
oldBackoff := jobcontroller.DefaultJobBackOff
defer func() { jobcontroller.DefaultJobBackOff = oldBackoff }()
jobcontroller.DefaultJobBackOff = 2 * time.Second
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
if err != nil {
t.Fatalf("Could not create job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: pointer.Int32(0),
})
// Fail the first pod
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: pointer.Int32(0),
Failed: 1,
})
// Fail the second pod
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: pointer.Int32(0),
Failed: 2,
})
jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
if err != nil {
t.Fatalf("Failed to list Job Pods: %v", err)
}
if len(jobPods) != 3 {
t.Fatalf("Expected to get %v pods, received %v", 4, len(jobPods))
}
creationTime := []time.Time{}
finishTime := []time.Time{}
for _, pod := range jobPods {
creationTime = append(creationTime, pod.CreationTimestamp.Time)
if len(pod.Status.ContainerStatuses) > 0 {
finishTime = append(finishTime, pod.Status.ContainerStatuses[0].State.Terminated.FinishedAt.Time)
}
}
sort.Slice(creationTime, func(i, j int) bool {
return creationTime[i].Before(creationTime[j])
})
sort.Slice(finishTime, func(i, j int) bool {
return finishTime[i].Before(finishTime[j])
})
if creationTime[1].Sub(finishTime[0]).Seconds() < jobcontroller.DefaultJobBackOff.Seconds() {
t.Fatalf("Second pod should be created at least %v seconds after the first pod", jobcontroller.DefaultJobBackOff)
}
if creationTime[1].Sub(finishTime[0]).Seconds() >= 2*jobcontroller.DefaultJobBackOff.Seconds() {
t.Fatalf("Second pod should be created before %v seconds after the first pod", 2*jobcontroller.DefaultJobBackOff)
}
diff := creationTime[2].Sub(finishTime[1]).Seconds()
// The third pod should not be created before 4 seconds
if diff < 2*jobcontroller.DefaultJobBackOff.Seconds() {
t.Fatalf("Third pod should be created at least %v seconds after the second pod", 2*jobcontroller.DefaultJobBackOff)
}
// The third pod should be created witin 8 seconds
// This check rules out double counting
if diff >= 4*jobcontroller.DefaultJobBackOff.Seconds() {
t.Fatalf("Third pod should be created before %v seconds after the second pod", 4*jobcontroller.DefaultJobBackOff)
}
}
// TestJobFailedWithInterrupts tests that a job were one pod fails and the rest
// succeed is marked as Failed, even if the controller fails in the middle.
func TestJobFailedWithInterrupts(t *testing.T) {
@ -1686,7 +1782,7 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse
}
}
func getJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) ([]*v1.Pod, error) {
func getJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, filter func(v1.PodStatus) bool) ([]*v1.Pod, error) {
t.Helper()
allPods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
@ -1694,8 +1790,7 @@ func getJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface
}
jobPods := make([]*v1.Pod, 0, 0)
for _, pod := range allPods.Items {
phase := pod.Status.Phase
if metav1.IsControlledBy(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) {
if metav1.IsControlledBy(&pod, jobObj) && filter(pod.Status) {
p := pod
jobPods = append(jobPods, &p)
}
@ -1817,6 +1912,17 @@ func validateJobCondition(ctx context.Context, t testing.TB, clientSet clientset
func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) (error, int) {
op := func(p *v1.Pod) bool {
p.Status.Phase = phase
if phase == v1.PodFailed || phase == v1.PodSucceeded {
p.Status.ContainerStatuses = []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
FinishedAt: metav1.Now(),
},
},
},
}
}
return true
}
return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)