diff --git a/pkg/controller/job/backoff_utils.go b/pkg/controller/job/backoff_utils.go new file mode 100644 index 00000000000..16c0f3aa3f2 --- /dev/null +++ b/pkg/controller/job/backoff_utils.go @@ -0,0 +1,205 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "fmt" + "sort" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/utils/clock" +) + +type backoffRecord struct { + key string + failuresAfterLastSuccess int32 + lastFailureTime *time.Time +} + +type backoffStore struct { + store cache.Store +} + +func (s *backoffStore) updateBackoffRecord(record backoffRecord) error { + b, ok, err := s.store.GetByKey(record.key) + if err != nil { + return err + } + + if !ok { + err = s.store.Add(&record) + if err != nil { + return err + } + } else { + backoffRecord := b.(*backoffRecord) + backoffRecord.failuresAfterLastSuccess = record.failuresAfterLastSuccess + backoffRecord.lastFailureTime = record.lastFailureTime + } + + return nil +} + +func (s *backoffStore) removeBackoffRecord(jobId string) error { + b, ok, err := s.store.GetByKey(jobId) + if err != nil { + return err + } + + if ok { + err = s.store.Delete(b) + if err != nil { + return err + } + } + + return nil + +} + +func newBackoffRecordStore() *backoffStore { + return &backoffStore{ + store: cache.NewStore(backoffRecordKeyFunc), + } +} + +var backoffRecordKeyFunc = func(obj interface{}) (string, error) { + if u, ok := obj.(*backoffRecord); ok { + return u.key, nil + } + return "", fmt.Errorf("could not find key for obj %#v", obj) +} + +func (backoffRecordStore *backoffStore) newBackoffRecord(clock clock.WithTicker, key string, newSucceededPods []*v1.Pod, newFailedPods []*v1.Pod) backoffRecord { + now := clock.Now() + var backoff *backoffRecord + + if b, exists, _ := backoffRecordStore.store.GetByKey(key); exists { + old := b.(*backoffRecord) + backoff = &backoffRecord{ + key: old.key, + failuresAfterLastSuccess: old.failuresAfterLastSuccess, + lastFailureTime: old.lastFailureTime, + } + } else { + backoff = &backoffRecord{ + key: key, + failuresAfterLastSuccess: 0, + lastFailureTime: nil, + } + } + + sortByFinishedTime(newSucceededPods, now) + sortByFinishedTime(newFailedPods, now) + + if len(newSucceededPods) == 0 { + if len(newFailedPods) == 0 { + return *backoff + } + + backoff.failuresAfterLastSuccess = backoff.failuresAfterLastSuccess + int32(len(newFailedPods)) + lastFailureTime := getFinishedTime(newFailedPods[len(newFailedPods)-1], now) + backoff.lastFailureTime = &lastFailureTime + return *backoff + + } else { + if len(newFailedPods) == 0 { + backoff.failuresAfterLastSuccess = 0 + backoff.lastFailureTime = nil + return *backoff + } + + backoff.failuresAfterLastSuccess = 0 + backoff.lastFailureTime = nil + + lastSuccessTime := getFinishedTime(newSucceededPods[len(newSucceededPods)-1], now) + for i := len(newFailedPods) - 1; i >= 0; i-- { + failedTime := getFinishedTime(newFailedPods[i], now) + if !failedTime.After(lastSuccessTime) { + break + } + if backoff.lastFailureTime == nil { + backoff.lastFailureTime = &failedTime + } + backoff.failuresAfterLastSuccess += 1 + } + + return *backoff + + } + +} + +func sortByFinishedTime(pods []*v1.Pod, currentTime time.Time) { + sort.Slice(pods, func(i, j int) bool { + p1 := pods[i] + p2 := pods[j] + p1FinishTime := getFinishedTime(p1, currentTime) + p2FinishTime := getFinishedTime(p2, currentTime) + + return p1FinishTime.Before(p2FinishTime) + }) +} + +func getFinishedTime(p *v1.Pod, currentTime time.Time) time.Time { + var finishTime *time.Time + for _, containerState := range p.Status.ContainerStatuses { + if containerState.State.Terminated == nil { + finishTime = nil + break + } + + if finishTime == nil { + finishTime = &containerState.State.Terminated.FinishedAt.Time + } else { + if finishTime.Before(containerState.State.Terminated.FinishedAt.Time) { + finishTime = &containerState.State.Terminated.FinishedAt.Time + } + } + } + + if finishTime == nil || finishTime.IsZero() { + return currentTime + } + + return *finishTime +} + +func (backoff backoffRecord) getRemainingTime(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration) time.Duration { + if backoff.failuresAfterLastSuccess == 0 { + return 0 + } + + backoffDuration := defaultBackoff + for i := 1; i < int(backoff.failuresAfterLastSuccess); i++ { + backoffDuration = backoffDuration * 2 + if backoffDuration >= maxBackoff { + backoffDuration = maxBackoff + break + } + } + + timeElapsedSinceLastFailure := clock.Since(*backoff.lastFailureTime) + + if backoffDuration < timeElapsedSinceLastFailure { + return 0 + } + + return backoffDuration - timeElapsedSinceLastFailure +} diff --git a/pkg/controller/job/backoff_utils_test.go b/pkg/controller/job/backoff_utils_test.go new file mode 100644 index 00000000000..79e0f2a4d2c --- /dev/null +++ b/pkg/controller/job/backoff_utils_test.go @@ -0,0 +1,395 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "testing" + "time" + + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clocktesting "k8s.io/utils/clock/testing" +) + +func TestNewBackoffRecord(t *testing.T) { + emptyStoreInitializer := func(*backoffStore) {} + defaultTestTime := metav1.NewTime(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)) + testCases := map[string]struct { + storeInitializer func(*backoffStore) + uncounted uncountedTerminatedPods + newSucceededPods []metav1.Time + newFailedPods []metav1.Time + wantBackoffRecord backoffRecord + }{ + "Empty backoff store and one new failure": { + storeInitializer: emptyStoreInitializer, + newSucceededPods: []metav1.Time{}, + newFailedPods: []metav1.Time{ + defaultTestTime, + }, + wantBackoffRecord: backoffRecord{ + key: "key", + lastFailureTime: &defaultTestTime.Time, + failuresAfterLastSuccess: 1, + }, + }, + "Empty backoff store and two new failures": { + storeInitializer: emptyStoreInitializer, + newSucceededPods: []metav1.Time{}, + newFailedPods: []metav1.Time{ + defaultTestTime, + metav1.NewTime(defaultTestTime.Add(-1 * time.Millisecond)), + }, + wantBackoffRecord: backoffRecord{ + key: "key", + lastFailureTime: &defaultTestTime.Time, + failuresAfterLastSuccess: 2, + }, + }, + "Empty backoff store, two failures followed by success": { + storeInitializer: emptyStoreInitializer, + newSucceededPods: []metav1.Time{ + defaultTestTime, + }, + newFailedPods: []metav1.Time{ + metav1.NewTime(defaultTestTime.Add(-2 * time.Millisecond)), + metav1.NewTime(defaultTestTime.Add(-1 * time.Millisecond)), + }, + wantBackoffRecord: backoffRecord{ + key: "key", + failuresAfterLastSuccess: 0, + }, + }, + "Empty backoff store, two failures, one success and two more failures": { + storeInitializer: emptyStoreInitializer, + newSucceededPods: []metav1.Time{ + metav1.NewTime(defaultTestTime.Add(-2 * time.Millisecond)), + }, + newFailedPods: []metav1.Time{ + defaultTestTime, + metav1.NewTime(defaultTestTime.Add(-4 * time.Millisecond)), + metav1.NewTime(defaultTestTime.Add(-3 * time.Millisecond)), + metav1.NewTime(defaultTestTime.Add(-1 * time.Millisecond)), + }, + wantBackoffRecord: backoffRecord{ + key: "key", + lastFailureTime: &defaultTestTime.Time, + failuresAfterLastSuccess: 2, + }, + }, + "Backoff store having failure count 2 and one new failure": { + storeInitializer: func(bis *backoffStore) { + bis.updateBackoffRecord(backoffRecord{ + key: "key", + failuresAfterLastSuccess: 2, + lastFailureTime: nil, + }) + }, + newSucceededPods: []metav1.Time{}, + newFailedPods: []metav1.Time{ + defaultTestTime, + }, + wantBackoffRecord: backoffRecord{ + key: "key", + lastFailureTime: &defaultTestTime.Time, + failuresAfterLastSuccess: 3, + }, + }, + "Empty backoff store with success and failure at same timestamp": { + storeInitializer: emptyStoreInitializer, + newSucceededPods: []metav1.Time{ + defaultTestTime, + }, + newFailedPods: []metav1.Time{ + defaultTestTime, + }, + wantBackoffRecord: backoffRecord{ + key: "key", + failuresAfterLastSuccess: 0, + }, + }, + "Empty backoff store with no success/failure": { + storeInitializer: emptyStoreInitializer, + newSucceededPods: []metav1.Time{}, + newFailedPods: []metav1.Time{}, + wantBackoffRecord: backoffRecord{ + key: "key", + failuresAfterLastSuccess: 0, + }, + }, + "Empty backoff store with one success": { + storeInitializer: emptyStoreInitializer, + newSucceededPods: []metav1.Time{ + defaultTestTime, + }, + newFailedPods: []metav1.Time{}, + wantBackoffRecord: backoffRecord{ + key: "key", + failuresAfterLastSuccess: 0, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + backoffRecordStore := newBackoffRecordStore() + tc.storeInitializer(backoffRecordStore) + + newSucceededPods := []*v1.Pod{} + newFailedPods := []*v1.Pod{} + + for _, finishTime := range tc.newSucceededPods { + newSucceededPods = append(newSucceededPods, &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + Status: v1.PodStatus{ + Phase: v1.PodSucceeded, + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + FinishedAt: finishTime, + }, + }, + }, + }, + }, + }) + } + + for _, finishTime := range tc.newFailedPods { + newFailedPods = append(newFailedPods, &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + Status: v1.PodStatus{ + Phase: v1.PodFailed, + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + FinishedAt: finishTime, + }, + }, + }, + }, + }, + }) + } + + fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second)) + + backoffRecord := backoffRecordStore.newBackoffRecord(fakeClock, "key", newSucceededPods, newFailedPods) + if diff := cmp.Diff(tc.wantBackoffRecord, backoffRecord, cmp.AllowUnexported(backoffRecord)); diff != "" { + t.Errorf("backoffRecord not matching; (-want,+got): %v", diff) + } + }) + } +} + +func TestGetFinishedTime(t *testing.T) { + defaultTestTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + testCases := map[string]struct { + pod v1.Pod + wantFinishTime time.Time + }{ + "Pod with multiple containers and all containers terminated": { + pod: v1.Pod{ + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{FinishedAt: metav1.NewTime(defaultTestTime.Add(-1 * time.Second))}, + }, + }, + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{FinishedAt: metav1.NewTime(defaultTestTime)}, + }, + }, + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{FinishedAt: metav1.NewTime(defaultTestTime.Add(-2 * time.Second))}, + }, + }, + }, + }, + }, + wantFinishTime: defaultTestTime, + }, + "Pod with multiple containers; two containers in terminated state and one in running state": { + pod: v1.Pod{ + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{FinishedAt: metav1.NewTime(defaultTestTime.Add(-1 * time.Second))}, + }, + }, + { + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{FinishedAt: metav1.NewTime(defaultTestTime.Add(-2 * time.Second))}, + }, + }, + }, + }, + }, + wantFinishTime: defaultTestTime, + }, + "Pod with single container in running state": { + pod: v1.Pod{ + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + }, + }, + wantFinishTime: defaultTestTime, + }, + "Pod with single container with zero finish time": { + pod: v1.Pod{ + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, + }, + }, + }, + }, + wantFinishTime: defaultTestTime, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + f := getFinishedTime(&tc.pod, defaultTestTime) + if !f.Equal(tc.wantFinishTime) { + t.Errorf("Expected value of finishedTime %v; got %v", tc.wantFinishTime, f) + } + }) + } +} + +func TestGetRemainingBackoffTime(t *testing.T) { + defaultTestTime := metav1.NewTime(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)) + testCases := map[string]struct { + backoffRecord backoffRecord + currentTime time.Time + maxBackoff time.Duration + defaultBackoff time.Duration + wantDuration time.Duration + }{ + "no failures": { + backoffRecord: backoffRecord{ + lastFailureTime: nil, + failuresAfterLastSuccess: 0, + }, + defaultBackoff: 5 * time.Second, + maxBackoff: 700 * time.Second, + wantDuration: 0 * time.Second, + }, + "one failure; current time and failure time are same": { + backoffRecord: backoffRecord{ + lastFailureTime: &defaultTestTime.Time, + failuresAfterLastSuccess: 1, + }, + currentTime: defaultTestTime.Time, + defaultBackoff: 5 * time.Second, + maxBackoff: 700 * time.Second, + wantDuration: 5 * time.Second, + }, + "one failure; current time == 1 second + failure time": { + backoffRecord: backoffRecord{ + lastFailureTime: &defaultTestTime.Time, + failuresAfterLastSuccess: 1, + }, + currentTime: defaultTestTime.Time.Add(time.Second), + defaultBackoff: 5 * time.Second, + maxBackoff: 700 * time.Second, + wantDuration: 4 * time.Second, + }, + "one failure; current time == expected backoff time": { + backoffRecord: backoffRecord{ + lastFailureTime: &defaultTestTime.Time, + failuresAfterLastSuccess: 1, + }, + currentTime: defaultTestTime.Time.Add(5 * time.Second), + defaultBackoff: 5 * time.Second, + maxBackoff: 700 * time.Second, + wantDuration: 0 * time.Second, + }, + "one failure; current time == expected backoff time + 1 Second": { + backoffRecord: backoffRecord{ + lastFailureTime: &defaultTestTime.Time, + failuresAfterLastSuccess: 1, + }, + currentTime: defaultTestTime.Time.Add(6 * time.Second), + defaultBackoff: 5 * time.Second, + maxBackoff: 700 * time.Second, + wantDuration: 0 * time.Second, + }, + "three failures; current time and failure time are same": { + backoffRecord: backoffRecord{ + lastFailureTime: &defaultTestTime.Time, + failuresAfterLastSuccess: 3, + }, + currentTime: defaultTestTime.Time, + defaultBackoff: 5 * time.Second, + maxBackoff: 700 * time.Second, + wantDuration: 20 * time.Second, + }, + "eight failures; current time and failure time are same; backoff not exceeding maxBackoff": { + backoffRecord: backoffRecord{ + lastFailureTime: &defaultTestTime.Time, + failuresAfterLastSuccess: 8, + }, + currentTime: defaultTestTime.Time, + defaultBackoff: 5 * time.Second, + maxBackoff: 700 * time.Second, + wantDuration: 640 * time.Second, + }, + "nine failures; current time and failure time are same; backoff exceeding maxBackoff": { + backoffRecord: backoffRecord{ + lastFailureTime: &defaultTestTime.Time, + failuresAfterLastSuccess: 9, + }, + currentTime: defaultTestTime.Time, + defaultBackoff: 5 * time.Second, + maxBackoff: 700 * time.Second, + wantDuration: 700 * time.Second, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + fakeClock := clocktesting.NewFakeClock(tc.currentTime.Truncate(time.Second)) + d := tc.backoffRecord.getRemainingTime(fakeClock, tc.defaultBackoff, tc.maxBackoff) + if d.Seconds() != tc.wantDuration.Seconds() { + t.Errorf("Expected value of duration %v; got %v", tc.wantDuration, d) + } + }) + } +} diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 17c57823d30..505ecac1197 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -93,7 +93,7 @@ type Controller struct { // To allow injection of the following for testing. updateStatusHandler func(ctx context.Context, job *batch.Job) (*batch.Job, error) patchJobHandler func(ctx context.Context, job *batch.Job, patch []byte) error - syncHandler func(ctx context.Context, jobKey string) (bool, error) + syncHandler func(ctx context.Context, jobKey string) error // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced cache.InformerSynced @@ -126,6 +126,8 @@ type Controller struct { podUpdateBatchPeriod time.Duration clock clock.WithTicker + + backoffRecordStore *backoffStore } // NewController creates a new Job controller that keeps the relevant pods @@ -150,6 +152,7 @@ func newControllerWithClock(podInformer coreinformers.PodInformer, jobInformer b broadcaster: eventBroadcaster, recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), clock: clock, + backoffRecordStore: newBackoffRecordStore(), } if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) { jm.podUpdateBatchPeriod = podUpdateBatchPeriod @@ -544,11 +547,9 @@ func (jm *Controller) processNextWorkItem(ctx context.Context) bool { } defer jm.queue.Done(key) - forget, err := jm.syncHandler(ctx, key.(string)) + err := jm.syncHandler(ctx, key.(string)) if err == nil { - if forget { - jm.queue.Forget(key) - } + jm.queue.Forget(key) return true } @@ -669,7 +670,7 @@ func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Po // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // concurrently with the same key. -func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rErr error) { +func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { startTime := jm.clock.Now() defer func() { klog.V(4).Infof("Finished syncing job %q (%v)", key, jm.clock.Since(startTime)) @@ -677,10 +678,10 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - return false, err + return err } if len(ns) == 0 || len(name) == 0 { - return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key) + return fmt.Errorf("invalid job key %q: either namespace or name is missing", key) } sharedJob, err := jm.jobLister.Jobs(ns).Get(name) if err != nil { @@ -688,21 +689,32 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr klog.V(4).Infof("Job has been deleted: %v", key) jm.expectations.DeleteExpectations(key) jm.finalizerExpectations.deleteExpectations(key) - return true, nil + + err := jm.backoffRecordStore.removeBackoffRecord(key) + if err != nil { + // re-syncing here as the record has to be removed for finished/deleted jobs + return fmt.Errorf("error removing backoff record %w", err) + } + return nil } - return false, err + return err } // make a copy so we don't mutate the shared cache job := *sharedJob.DeepCopy() // if job was finished previously, we don't want to redo the termination if IsJobFinished(&job) { - return true, nil + err := jm.backoffRecordStore.removeBackoffRecord(key) + if err != nil { + // re-syncing here as the record has to be removed for finished/deleted jobs + return fmt.Errorf("error removing backoff record %w", err) + } + return nil } if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode != batch.NonIndexedCompletion && *job.Spec.CompletionMode != batch.IndexedCompletion { jm.recorder.Event(&job, v1.EventTypeWarning, "UnknownCompletionMode", "Skipped Job sync because completion mode is unknown") - return false, nil + return nil } completionMode := getCompletionMode(&job) @@ -731,12 +743,14 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr pods, err := jm.getPodsForJob(ctx, &job) if err != nil { - return false, err + return err } activePods := controller.FilterActivePods(pods) active := int32(len(activePods)) - succeeded, failed := getStatus(&job, pods, uncounted, expectedRmFinalizers) + newSucceededPods, newFailedPods := getNewFinishedPods(&job, pods, uncounted, expectedRmFinalizers) + succeeded := job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(uncounted.succeeded)) + failed := job.Status.Failed + int32(len(newFailedPods)) + int32(len(uncounted.failed)) var ready *int32 if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) { ready = pointer.Int32(countReadyPods(activePods)) @@ -748,6 +762,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr job.Status.StartTime = &now } + newBackoffInfo := jm.backoffRecordStore.newBackoffRecord(jm.clock, key, newSucceededPods, newFailedPods) + var manageJobErr error var finishedCondition *batch.JobCondition @@ -799,7 +815,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr } else { manageJobCalled := false if satisfiedExpectations && job.DeletionTimestamp == nil { - active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes) + active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes, newBackoffInfo) manageJobCalled = true } complete := false @@ -852,34 +868,27 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr } } - // Check if the number of jobs succeeded increased since the last check. If yes "forget" should be true - // This logic is linked to the issue: https://github.com/kubernetes/kubernetes/issues/56853 that aims to - // improve the Job backoff policy when parallelism > 1 and few Jobs failed but others succeed. - // In this case, we should clear the backoff delay. - forget = job.Status.Succeeded < succeeded - needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready) job.Status.Active = active job.Status.Ready = ready - err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate) + err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate, newBackoffInfo) if err != nil { if apierrors.IsConflict(err) { // we probably have a stale informer cache // so don't return an error to avoid backoff jm.enqueueController(&job, false) - return false, nil + return nil } - return false, fmt.Errorf("tracking status: %w", err) + return fmt.Errorf("tracking status: %w", err) } + jobFinished := IsJobFinished(&job) if jobHasNewFailure && !jobFinished { // returning an error will re-enqueue Job after the backoff period - return forget, fmt.Errorf("failed pod(s) detected for job key %q", key) + return fmt.Errorf("failed pod(s) detected for job key %q", key) } - if suspendCondChanged { - forget = true - } - return forget, manageJobErr + + return manageJobErr } // deleteActivePods issues deletion for active Pods, preserving finalizers. @@ -952,7 +961,7 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey // // It does this up to a limited number of Pods so that the size of .status // doesn't grow too much and this sync doesn't starve other Jobs. -func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.String, finishedCond *batch.JobCondition, needsFlush bool) error { +func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.String, finishedCond *batch.JobCondition, needsFlush bool, newBackoffRecord backoffRecord) error { isIndexed := isIndexedJob(job) var podsToRemoveFinalizer []*v1.Pod uncountedStatus := job.Status.UncountedTerminatedPods @@ -1064,7 +1073,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job } } var err error - if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush); err != nil { + if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush, newBackoffRecord); err != nil { return err } jobFinished := jm.enactJobFinished(job, finishedCond) @@ -1094,12 +1103,22 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job // // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. -func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) { +func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool, newBackoffRecord backoffRecord) (*batch.Job, bool, error) { var err error if needsFlush { if job, err = jm.updateStatusHandler(ctx, job); err != nil { return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) } + + err = jm.backoffRecordStore.updateBackoffRecord(newBackoffRecord) + + if err != nil { + // this error might undercount the backoff. + // re-syncing from the current state might not help to recover + // the backoff information + klog.ErrorS(err, "Backoff update failed") + } + recordJobPodFinished(job, *oldCounters) // Shallow copy, as it will only be used to detect changes in the counters. *oldCounters = job.Status @@ -1316,15 +1335,13 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *s return nil } -// getStatus returns number of succeeded and failed pods running a job. The number -// of failed pods can be affected by the podFailurePolicy. -func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.String) (succeeded, failed int32) { - succeeded = job.Status.Succeeded - failed = job.Status.Failed - succeeded += int32(countValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool { +// getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted +// in the job status. The list of failed pods can be affected by the podFailurePolicy. +func getNewFinishedPods(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.String) (succeededPods, failedPods []*v1.Pod) { + succeededPods = getValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool { return p.Status.Phase == v1.PodSucceeded - })) - failed += int32(countValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool { + }) + failedPods = getValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { if !isPodFailed(p, job) { return false @@ -1334,8 +1351,8 @@ func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPod } else { return isPodFailed(p, job) } - })) - return succeeded, failed + }) + return succeededPods, failedPods } // jobSuspended returns whether a Job is suspended while taking the feature @@ -1346,8 +1363,9 @@ func jobSuspended(job *batch.Job) bool { // manageJob is the core method responsible for managing the number of running // pods according to what is specified in the job.Spec. +// Respects back-off; does not create new pods if the back-off time has not passed // Does NOT modify . -func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval) (int32, string, error) { +func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, backoff backoffRecord) (int32, string, error) { active := int32(len(activePods)) parallelism := *job.Spec.Parallelism jobKey, err := controller.KeyFunc(job) @@ -1408,6 +1426,11 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods } if active < wantActive { + remainingTime := backoff.getRemainingTime(jm.clock, DefaultJobBackOff, MaxJobBackOff) + if remainingTime > 0 { + jm.enqueueControllerDelayed(job, true, remainingTime) + return 0, metrics.JobSyncActionPodsCreated, nil + } diff := wantActive - active if diff > int32(MaxPodCreateDeletePerSync) { diff = int32(MaxPodCreateDeletePerSync) @@ -1554,13 +1577,14 @@ func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Dur return calculated } -// countValidPodsWithFilter returns number of valid pods that pass the filter. -// Pods are valid if they have a finalizer and, for Indexed Jobs, a valid -// completion index. -func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.String, expectedRmFinalizers sets.String, filter func(*v1.Pod) bool) int { - result := len(uncounted) +// getValidPodsWithFilter returns the valid pods that pass the filter. +// Pods are valid if they have a finalizer or in uncounted set +// and, for Indexed Jobs, a valid completion index. +func getValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.String, expectedRmFinalizers sets.String, filter func(*v1.Pod) bool) []*v1.Pod { + var result []*v1.Pod for _, p := range pods { uid := string(p.UID) + // Pods that don't have a completion finalizer are in the uncounted set or // have already been accounted for in the Job status. if !hasJobTrackingFinalizer(p) || uncounted.Has(uid) || expectedRmFinalizers.Has(uid) { @@ -1573,7 +1597,7 @@ func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.Str } } if filter(p) { - result++ + result = append(result, p) } } return result diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 6cfadb3269c..b1f62ab5022 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -62,6 +62,10 @@ import ( var realClock = &clock.RealClock{} var alwaysReady = func() bool { return true } +// testFinishedAt represents time one second later than unix epoch +// this will be used in various test cases where we don't want back-off to kick in +var testFinishedAt = metav1.NewTime((time.Time{}).Add(time.Second)) + func newJobWithName(name string, parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job { j := &batch.Job{ TypeMeta: metav1.TypeMeta{Kind: "Job"}, @@ -141,6 +145,15 @@ func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod { for i := 0; i < count; i++ { newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) newPod.Status = v1.PodStatus{Phase: status} + newPod.Status.ContainerStatuses = []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + FinishedAt: testFinishedAt, + }, + }, + }, + } newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer) pods = append(pods, newPod) } @@ -176,6 +189,17 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status for _, s := range status { p := newPod(fmt.Sprintf("pod-%s", rand.String(10)), job) p.Status = v1.PodStatus{Phase: s.Phase} + if s.Phase == v1.PodFailed || s.Phase == v1.PodSucceeded { + p.Status.ContainerStatuses = []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + FinishedAt: testFinishedAt, + }, + }, + }, + } + } if s.Index != noIndex { p.Annotations = map[string]string{ batch.JobCompletionIndexAnnotation: s.Index, @@ -188,15 +212,17 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status } type jobInitialStatus struct { - active int - succeed int - failed int + active int + succeed int + failed int + startTime *time.Time } func TestControllerSyncJob(t *testing.T) { jobConditionComplete := batch.JobComplete jobConditionFailed := batch.JobFailed jobConditionSuspended := batch.JobSuspended + referenceTime := time.Now() testCases := map[string]struct { // job setup @@ -209,6 +235,8 @@ func TestControllerSyncJob(t *testing.T) { wasSuspended bool suspend bool initialStatus *jobInitialStatus + backoffRecord *backoffRecord + controllerTime *time.Time // pod setup @@ -216,7 +244,6 @@ func TestControllerSyncJob(t *testing.T) { // This means that there is no status update so the counters for // failedPods and succeededPods cannot be incremented. podControllerError error - jobKeyForget bool pendingPods int activePods int readyPods int @@ -246,7 +273,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, backoffLimit: 6, - jobKeyForget: false, expectedCreations: 2, expectedActive: 2, }, @@ -254,7 +280,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: -1, backoffLimit: 6, - jobKeyForget: false, expectedCreations: 2, expectedActive: 2, }, @@ -262,7 +287,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, backoffLimit: 6, - jobKeyForget: false, pendingPods: 2, expectedActive: 2, }, @@ -270,7 +294,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 3, completions: 5, backoffLimit: 6, - jobKeyForget: false, activePods: 3, readyPods: 2, expectedActive: 3, @@ -279,7 +302,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 3, completions: 5, backoffLimit: 6, - jobKeyForget: false, activePods: 3, readyPods: 2, expectedActive: 3, @@ -290,7 +312,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: -1, backoffLimit: 6, - jobKeyForget: false, activePods: 2, expectedActive: 2, }, @@ -298,7 +319,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, backoffLimit: 6, - jobKeyForget: true, activePods: 1, succeededPods: 1, expectedCreations: 1, @@ -306,11 +326,48 @@ func TestControllerSyncJob(t *testing.T) { expectedSucceeded: 1, expectedPodPatches: 1, }, + "too few active pods and active back-off": { + parallelism: 1, + completions: 1, + backoffLimit: 6, + backoffRecord: &backoffRecord{ + failuresAfterLastSuccess: 1, + lastFailureTime: &referenceTime, + }, + initialStatus: &jobInitialStatus{ + startTime: func() *time.Time { + now := time.Now() + return &now + }(), + }, + activePods: 0, + succeededPods: 0, + expectedCreations: 0, + expectedActive: 0, + expectedSucceeded: 0, + expectedPodPatches: 0, + controllerTime: &referenceTime, + }, + "too few active pods and no back-offs": { + parallelism: 1, + completions: 1, + backoffLimit: 6, + backoffRecord: &backoffRecord{ + failuresAfterLastSuccess: 0, + lastFailureTime: &referenceTime, + }, + activePods: 0, + succeededPods: 0, + expectedCreations: 1, + expectedActive: 1, + expectedSucceeded: 0, + expectedPodPatches: 0, + controllerTime: &referenceTime, + }, "too few active pods with a dynamic job": { parallelism: 2, completions: -1, backoffLimit: 6, - jobKeyForget: false, activePods: 1, expectedCreations: 1, expectedActive: 2, @@ -320,7 +377,6 @@ func TestControllerSyncJob(t *testing.T) { completions: 5, backoffLimit: 6, podControllerError: fmt.Errorf("fake error"), - jobKeyForget: false, activePods: 1, succeededPods: 1, expectedCreations: 1, @@ -332,7 +388,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, backoffLimit: 6, - jobKeyForget: false, activePods: 3, expectedDeletions: 1, expectedActive: 2, @@ -343,7 +398,6 @@ func TestControllerSyncJob(t *testing.T) { completions: 5, backoffLimit: 6, podControllerError: fmt.Errorf("fake error"), - jobKeyForget: false, activePods: 3, expectedDeletions: 0, expectedPodPatches: 1, @@ -353,7 +407,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, backoffLimit: 6, - jobKeyForget: true, activePods: 1, succeededPods: 1, failedPods: 1, @@ -375,10 +428,14 @@ func TestControllerSyncJob(t *testing.T) { expectedPodPatches: 1, }, "no new pod; possible finalizer update of failed pod": { - parallelism: 1, - completions: 1, - backoffLimit: 6, - initialStatus: &jobInitialStatus{1, 0, 1}, + parallelism: 1, + completions: 1, + backoffLimit: 6, + initialStatus: &jobInitialStatus{ + active: 1, + succeed: 0, + failed: 1, + }, activePods: 1, failedPods: 0, expectedCreations: 0, @@ -402,7 +459,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, backoffLimit: 6, - jobKeyForget: true, succeededPods: 5, expectedSucceeded: 5, expectedCondition: &jobConditionComplete, @@ -413,7 +469,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: -1, backoffLimit: 6, - jobKeyForget: true, activePods: 1, succeededPods: 1, expectedActive: 1, @@ -424,7 +479,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: -1, backoffLimit: 6, - jobKeyForget: true, succeededPods: 2, expectedSucceeded: 2, expectedCondition: &jobConditionComplete, @@ -435,7 +489,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: -1, backoffLimit: 6, - jobKeyForget: true, succeededPods: 1, failedPods: 1, expectedSucceeded: 1, @@ -448,7 +501,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, backoffLimit: 6, - jobKeyForget: false, activePods: 10, expectedDeletions: 8, expectedActive: 2, @@ -458,7 +510,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 3, completions: 4, backoffLimit: 6, - jobKeyForget: true, activePods: 3, succeededPods: 2, expectedDeletions: 1, @@ -470,7 +521,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, backoffLimit: 6, - jobKeyForget: true, activePods: 2, succeededPods: 2, expectedActive: 2, @@ -482,7 +532,6 @@ func TestControllerSyncJob(t *testing.T) { completions: 5, backoffLimit: 6, deleting: true, - jobKeyForget: true, pendingPods: 1, activePods: 1, succeededPods: 1, @@ -495,7 +544,6 @@ func TestControllerSyncJob(t *testing.T) { completions: 200, backoffLimit: 6, podLimit: 10, - jobKeyForget: false, expectedCreations: 10, expectedActive: 10, }, @@ -503,7 +551,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, deleting: true, - jobKeyForget: false, failedPods: 1, expectedFailed: 1, expectedCondition: &jobConditionFailed, @@ -525,7 +572,6 @@ func TestControllerSyncJob(t *testing.T) { completions: 5, backoffLimit: 6, completionMode: batch.IndexedCompletion, - jobKeyForget: false, expectedCreations: 2, expectedActive: 2, expectedCreatedIndexes: sets.NewInt(0, 1), @@ -535,7 +581,6 @@ func TestControllerSyncJob(t *testing.T) { completions: 3, backoffLimit: 6, completionMode: batch.IndexedCompletion, - jobKeyForget: true, podsWithIndexes: []indexPhase{ {"0", v1.PodSucceeded}, {"1", v1.PodFailed}, @@ -554,7 +599,6 @@ func TestControllerSyncJob(t *testing.T) { completions: 3, backoffLimit: 6, completionMode: batch.IndexedCompletion, - jobKeyForget: true, podsWithIndexes: []indexPhase{ {"0", v1.PodSucceeded}, {"1", v1.PodSucceeded}, @@ -582,7 +626,6 @@ func TestControllerSyncJob(t *testing.T) { {"8", v1.PodSucceeded}, {"9", v1.PodSucceeded}, }, - jobKeyForget: true, expectedCreations: 6, expectedActive: 8, expectedSucceeded: 6, @@ -623,7 +666,6 @@ func TestControllerSyncJob(t *testing.T) { {"1", v1.PodRunning}, {"2", v1.PodRunning}, }, - jobKeyForget: true, expectedDeletions: 3, expectedActive: 2, expectedSucceeded: 1, @@ -645,7 +687,6 @@ func TestControllerSyncJob(t *testing.T) { {"2", v1.PodRunning}, {"2", v1.PodPending}, }, - jobKeyForget: true, expectedCreations: 0, expectedDeletions: 2, expectedActive: 2, @@ -665,7 +706,6 @@ func TestControllerSyncJob(t *testing.T) { {"7", v1.PodPending}, {"8", v1.PodFailed}, }, - jobKeyForget: true, expectedCreations: 0, // only one of creations and deletions can happen in a sync expectedSucceeded: 1, expectedDeletions: 2, @@ -682,7 +722,6 @@ func TestControllerSyncJob(t *testing.T) { activePods: 2, // parallelism == active, expectations satisfied completions: 4, backoffLimit: 6, - jobKeyForget: true, expectedCreations: 0, expectedDeletions: 2, expectedActive: 0, @@ -702,7 +741,6 @@ func TestControllerSyncJob(t *testing.T) { fakeExpectationAtCreation: -1, // the controller is expecting a deletion completions: 4, backoffLimit: 6, - jobKeyForget: false, expectedCreations: 0, expectedDeletions: 0, expectedActive: 3, @@ -713,7 +751,6 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 4, backoffLimit: 6, - jobKeyForget: true, expectedCreations: 2, expectedDeletions: 0, expectedActive: 2, @@ -732,7 +769,6 @@ func TestControllerSyncJob(t *testing.T) { activePods: 2, // parallelism == active, expectations satisfied completions: 4, backoffLimit: 6, - jobKeyForget: false, expectedCreations: 0, expectedDeletions: 0, expectedActive: 2, @@ -746,7 +782,15 @@ func TestControllerSyncJob(t *testing.T) { // job manager setup clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) + + var fakeClock clock.WithTicker + if tc.controllerTime != nil { + fakeClock = clocktesting.NewFakeClock(*tc.controllerTime) + } else { + fakeClock = clocktesting.NewFakeClock(time.Now()) + } + + manager, sharedInformerFactory := newControllerFromClientWithClock(clientSet, controller.NoResyncPeriodFunc, fakeClock) fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -761,11 +805,21 @@ func TestControllerSyncJob(t *testing.T) { job.Status.Active = int32(tc.initialStatus.active) job.Status.Succeeded = int32(tc.initialStatus.succeed) job.Status.Failed = int32(tc.initialStatus.failed) + if tc.initialStatus.startTime != nil { + startTime := metav1.NewTime(*tc.initialStatus.startTime) + job.Status.StartTime = &startTime + } } + key, err := controller.KeyFunc(job) if err != nil { t.Errorf("Unexpected error getting job key: %v", err) } + + if tc.backoffRecord != nil { + tc.backoffRecord.key = key + manager.backoffRecordStore.updateBackoffRecord(*tc.backoffRecord) + } if tc.fakeExpectationAtCreation < 0 { manager.expectations.ExpectDeletions(key, int(-tc.fakeExpectationAtCreation)) } else if tc.fakeExpectationAtCreation > 0 { @@ -790,7 +844,7 @@ func TestControllerSyncJob(t *testing.T) { } // run - forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) + err = manager.syncJob(context.TODO(), testutil.GetKey(job, t)) // We need requeue syncJob task if podController error if tc.podControllerError != nil { @@ -808,9 +862,6 @@ func TestControllerSyncJob(t *testing.T) { } else if err != nil { t.Errorf("Unexpected error when syncing jobs: %v", err) } - if forget != tc.jobKeyForget { - t.Errorf("Unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget) - } // validate created/deleted pods if int32(len(fakePodControl.Templates)) != tc.expectedCreations { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates)) @@ -928,7 +979,7 @@ func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantI } } -func TestGetStatus(t *testing.T) { +func TestGetNewFinshedPods(t *testing.T) { cases := map[string]struct { job batch.Job pods []*v1.Pod @@ -1025,7 +1076,9 @@ func TestGetStatus(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) - succeeded, failed := getStatus(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers) + succeededPods, failedPods := getNewFinishedPods(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers) + succeeded := int32(len(succeededPods)) + tc.job.Status.Succeeded + int32(len(uncounted.succeeded)) + failed := int32(len(failedPods)) + tc.job.Status.Failed + int32(len(uncounted.failed)) if succeeded != tc.wantSucceeded { t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded) } @@ -1597,7 +1650,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { if isIndexedJob(job) { succeededIndexes = succeededIndexesFromString(job.Status.CompletedIndexes, int(*job.Spec.Completions)) } - err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush) + err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush, backoffRecord{}) if !errors.Is(err, tc.wantErr) { t.Errorf("Got error %v, want %v", err, tc.wantErr) } @@ -1742,13 +1795,10 @@ func TestSyncJobPastDeadline(t *testing.T) { setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0) // run - forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) + err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } - if forget != tc.expectedForGetKey { - t.Errorf("Unexpected forget value. Expected %v, saw %v\n", tc.expectedForGetKey, forget) - } // validate created/deleted pods if int32(len(fakePodControl.Templates)) != 0 { t.Errorf("Unexpected number of creates. Expected 0, saw %d\n", len(fakePodControl.Templates)) @@ -1903,13 +1953,10 @@ func TestSingleJobFailedCondition(t *testing.T) { job.Status.StartTime = &start job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobFailed, v1.ConditionFalse, "DeadlineExceeded", "Job was active longer than specified deadline", realClock.Now())) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) + err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } - if forget { - t.Errorf("Unexpected forget value. Expected %v, saw %v\n", false, forget) - } if len(fakePodControl.DeletePodName) != 0 { t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName)) } @@ -1937,13 +1984,10 @@ func TestSyncJobComplete(t *testing.T) { job := newJob(1, 1, 6, batch.NonIndexedCompletion) job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobComplete, v1.ConditionTrue, "", "", realClock.Now())) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) + err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Fatalf("Unexpected error when syncing jobs %v", err) } - if !forget { - t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget) - } actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name) if err != nil { t.Fatalf("Unexpected error when trying to get job from the store: %v", err) @@ -1965,13 +2009,10 @@ func TestSyncJobDeleted(t *testing.T) { return job, nil } job := newJob(2, 2, 6, batch.NonIndexedCompletion) - forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) + err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } - if !forget { - t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget) - } if len(fakePodControl.Templates) != 0 { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates)) } @@ -2081,7 +2122,8 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ - ExitCode: 42, + ExitCode: 42, + FinishedAt: testFinishedAt, }, }, }, @@ -2312,7 +2354,8 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ - ExitCode: 5, + ExitCode: 5, + FinishedAt: testFinishedAt, }, }, }, @@ -2507,7 +2550,8 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { Name: "main-container", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ - ExitCode: 5, + ExitCode: 5, + FinishedAt: testFinishedAt, }, }, }, @@ -2741,7 +2785,8 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { { State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ - ExitCode: 2, + ExitCode: 2, + FinishedAt: testFinishedAt, }, }, }, @@ -2803,6 +2848,15 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { Status: v1.ConditionTrue, }, }, + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + FinishedAt: testFinishedAt, + }, + }, + }, + }, }, }, }, @@ -3627,7 +3681,7 @@ func TestWatchJobs(t *testing.T) { // The update sent through the fakeWatcher should make its way into the workqueue, // and eventually into the syncHandler. - manager.syncHandler = func(ctx context.Context, key string) (bool, error) { + manager.syncHandler = func(ctx context.Context, key string) error { defer close(received) ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { @@ -3636,12 +3690,12 @@ func TestWatchJobs(t *testing.T) { job, err := manager.jobLister.Jobs(ns).Get(name) if err != nil || job == nil { t.Errorf("Expected to find job under key %v: %v", key, err) - return true, nil + return nil } if !apiequality.Semantic.DeepDerivative(*job, testJob) { t.Errorf("Expected %#v, but got %#v", testJob, *job) } - return true, nil + return nil } // Start only the job watcher and the workqueue, send a watch event, // and make sure it hits the sync method. @@ -3672,7 +3726,7 @@ func TestWatchPods(t *testing.T) { received := make(chan struct{}) // The pod update sent through the fakeWatcher should figure out the managing job and // send it into the syncHandler. - manager.syncHandler = func(ctx context.Context, key string) (bool, error) { + manager.syncHandler = func(ctx context.Context, key string) error { ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { t.Errorf("Error getting namespace/name from key %v: %v", key, err) @@ -3684,10 +3738,10 @@ func TestWatchPods(t *testing.T) { if !apiequality.Semantic.DeepDerivative(job, testJob) { t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job) close(received) - return true, nil + return nil } close(received) - return true, nil + return nil } // Start only the pod watcher and the workqueue, send a watch event, // and make sure it hits the sync method for the right job. @@ -4080,14 +4134,11 @@ func TestJobBackoffForOnFailure(t *testing.T) { } // run - forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) + err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Errorf("unexpected error syncing job. Got %#v", err) } - if forget != tc.jobKeyForget { - t.Errorf("unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget) - } // validate status if actual.Status.Active != tc.expectedActive { t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) @@ -4177,6 +4228,9 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() for _, pod := range newPodList(tc.failedPods, v1.PodFailed, job) { + pod.Status.ContainerStatuses = []v1.ContainerStatus{{State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ + FinishedAt: testFinishedAt, + }}}} podIndexer.Add(pod) } for _, pod := range newPodList(tc.activePods, tc.activePodsPhase, job) { @@ -4184,14 +4238,11 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { } // run - forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) + err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if (err != nil) != tc.isExpectingAnError { t.Errorf("unexpected error syncing job. Got %#v, isExpectingAnError: %v\n", err, tc.isExpectingAnError) } - if forget != tc.jobKeyForget { - t.Errorf("unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget) - } // validate status if actual.Status.Active != tc.expectedActive { t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index d4498c980b7..1b71ced1674 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "sort" "strconv" "strings" "sync" @@ -399,7 +400,9 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi Ready: pointer.Int32(0), }) - jobPods, err := getJobPods(ctx, t, cs, jobObj) + jobPods, err := getJobPods(ctx, t, cs, jobObj, func(s v1.PodStatus) bool { + return (s.Phase == v1.PodPending || s.Phase == v1.PodRunning) + }) if err != nil { t.Fatalf("Failed to list Job Pods: %v", err) } @@ -543,6 +546,13 @@ func TestJobPodFailurePolicy(t *testing.T) { } podStatusNotMatchingAnyRule := v1.PodStatus{ Phase: v1.PodFailed, + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, + }, + }, } testCases := map[string]struct { enableJobPodFailurePolicy bool @@ -1367,6 +1377,92 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) { validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj) } +func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) { + // overwrite the default value for faster testing + oldBackoff := jobcontroller.DefaultJobBackOff + defer func() { jobcontroller.DefaultJobBackOff = oldBackoff }() + jobcontroller.DefaultJobBackOff = 2 * time.Second + + closeFn, restConfig, clientSet, ns := setup(t, "simple") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + defer cancel() + + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{}) + if err != nil { + t.Fatalf("Could not create job: %v", err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 1, + Ready: pointer.Int32(0), + }) + + // Fail the first pod + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 1, + Ready: pointer.Int32(0), + Failed: 1, + }) + + // Fail the second pod + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { + t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 1, + Ready: pointer.Int32(0), + Failed: 2, + }) + + jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true }) + if err != nil { + t.Fatalf("Failed to list Job Pods: %v", err) + } + if len(jobPods) != 3 { + t.Fatalf("Expected to get %v pods, received %v", 4, len(jobPods)) + } + + creationTime := []time.Time{} + finishTime := []time.Time{} + for _, pod := range jobPods { + creationTime = append(creationTime, pod.CreationTimestamp.Time) + if len(pod.Status.ContainerStatuses) > 0 { + finishTime = append(finishTime, pod.Status.ContainerStatuses[0].State.Terminated.FinishedAt.Time) + } + } + + sort.Slice(creationTime, func(i, j int) bool { + return creationTime[i].Before(creationTime[j]) + }) + sort.Slice(finishTime, func(i, j int) bool { + return finishTime[i].Before(finishTime[j]) + }) + + if creationTime[1].Sub(finishTime[0]).Seconds() < jobcontroller.DefaultJobBackOff.Seconds() { + t.Fatalf("Second pod should be created at least %v seconds after the first pod", jobcontroller.DefaultJobBackOff) + } + + if creationTime[1].Sub(finishTime[0]).Seconds() >= 2*jobcontroller.DefaultJobBackOff.Seconds() { + t.Fatalf("Second pod should be created before %v seconds after the first pod", 2*jobcontroller.DefaultJobBackOff) + } + + diff := creationTime[2].Sub(finishTime[1]).Seconds() + + // The third pod should not be created before 4 seconds + if diff < 2*jobcontroller.DefaultJobBackOff.Seconds() { + t.Fatalf("Third pod should be created at least %v seconds after the second pod", 2*jobcontroller.DefaultJobBackOff) + } + + // The third pod should be created witin 8 seconds + // This check rules out double counting + if diff >= 4*jobcontroller.DefaultJobBackOff.Seconds() { + t.Fatalf("Third pod should be created before %v seconds after the second pod", 4*jobcontroller.DefaultJobBackOff) + } +} + // TestJobFailedWithInterrupts tests that a job were one pod fails and the rest // succeed is marked as Failed, even if the controller fails in the middle. func TestJobFailedWithInterrupts(t *testing.T) { @@ -1686,7 +1782,7 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse } } -func getJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) ([]*v1.Pod, error) { +func getJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, filter func(v1.PodStatus) bool) ([]*v1.Pod, error) { t.Helper() allPods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { @@ -1694,8 +1790,7 @@ func getJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface } jobPods := make([]*v1.Pod, 0, 0) for _, pod := range allPods.Items { - phase := pod.Status.Phase - if metav1.IsControlledBy(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) { + if metav1.IsControlledBy(&pod, jobObj) && filter(pod.Status) { p := pod jobPods = append(jobPods, &p) } @@ -1817,6 +1912,17 @@ func validateJobCondition(ctx context.Context, t testing.TB, clientSet clientset func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) (error, int) { op := func(p *v1.Pod) bool { p.Status.Phase = phase + if phase == v1.PodFailed || phase == v1.PodSucceeded { + p.Status.ContainerStatuses = []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + FinishedAt: metav1.Now(), + }, + }, + }, + } + } return true } return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)