From c51a422d7870e19b724999bbc72155058e982b55 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 12 Jun 2023 09:31:34 +0200 Subject: [PATCH 1/2] Cleanup job controller handling of backoff --- pkg/controller/job/backoff_utils.go | 2 +- pkg/controller/job/backoff_utils_test.go | 2 +- pkg/controller/job/job_controller.go | 132 +++++++++------------- pkg/controller/job/job_controller_test.go | 85 ++++---------- 4 files changed, 78 insertions(+), 143 deletions(-) diff --git a/pkg/controller/job/backoff_utils.go b/pkg/controller/job/backoff_utils.go index 4a3a5e7fba8..ce68468f1d0 100644 --- a/pkg/controller/job/backoff_utils.go +++ b/pkg/controller/job/backoff_utils.go @@ -75,7 +75,7 @@ func (s *backoffStore) removeBackoffRecord(jobId string) error { } -func newBackoffRecordStore() *backoffStore { +func newBackoffStore() *backoffStore { return &backoffStore{ store: cache.NewStore(backoffRecordKeyFunc), } diff --git a/pkg/controller/job/backoff_utils_test.go b/pkg/controller/job/backoff_utils_test.go index 00659c4bb4d..68077c44f3d 100644 --- a/pkg/controller/job/backoff_utils_test.go +++ b/pkg/controller/job/backoff_utils_test.go @@ -148,7 +148,7 @@ func TestNewBackoffRecord(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - backoffRecordStore := newBackoffRecordStore() + backoffRecordStore := newBackoffStore() tc.storeInitializer(backoffRecordStore) newSucceededPods := []*v1.Pod{} diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 9c2d152e974..aad4c665dfb 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -19,7 +19,6 @@ package job import ( "context" "fmt" - "math" "reflect" "sort" "sync" @@ -57,9 +56,8 @@ import ( "k8s.io/utils/pointer" ) -// podUpdateBatchPeriod is the batch period to hold pod updates before syncing -// a Job. It is used if the feature gate JobReadyPods is enabled. -const podUpdateBatchPeriod = time.Second +// syncJobBatchPeriod is the batch period for controller sync invocations for a Job. +const syncJobBatchPeriod = time.Second const ( // PodFailurePolicy reason indicates a job failure condition is added due to @@ -123,11 +121,13 @@ type Controller struct { broadcaster record.EventBroadcaster recorder record.EventRecorder - podUpdateBatchPeriod time.Duration + syncJobBatchPeriod time.Duration clock clock.WithTicker - backoffRecordStore *backoffStore + // Store with information to compute the expotential backoff delay for pod + // recreation in case of pod failures. + podBackoffStore *backoffStore } // NewController creates a new Job controller that keeps the relevant pods @@ -153,15 +153,13 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn broadcaster: eventBroadcaster, recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), clock: clock, - backoffRecordStore: newBackoffRecordStore(), - } - if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) { - jm.podUpdateBatchPeriod = podUpdateBatchPeriod + podBackoffStore: newBackoffStore(), } + jm.syncJobBatchPeriod = syncJobBatchPeriod jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - jm.enqueueController(logger, obj, true) + jm.enqueueSyncJobImmediately(logger, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { jm.updateJob(logger, oldObj, newObj) @@ -286,7 +284,7 @@ func (jm *Controller) addPod(logger klog.Logger, obj interface{}) { return } jm.expectations.CreationObserved(jobKey) - jm.enqueueControllerPodUpdate(logger, job, true) + jm.enqueueSyncJobBatched(logger, job) return } @@ -300,7 +298,7 @@ func (jm *Controller) addPod(logger klog.Logger, obj interface{}) { // DO NOT observe creation because no controller should be waiting for an // orphan. for _, job := range jm.getPodJobs(pod) { - jm.enqueueControllerPodUpdate(logger, job, true) + jm.enqueueSyncJobBatched(logger, job) } } @@ -325,11 +323,6 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) { return } - // the only time we want the backoff to kick-in, is when the pod failed for the first time. - // we don't want to re-calculate backoff for an update event when the tracking finalizer - // for a failed pod is removed. - immediate := !(curPod.Status.Phase == v1.PodFailed && oldPod.Status.Phase != v1.PodFailed) - // Don't check if oldPod has the finalizer, as during ownership transfer // finalizers might be re-added and removed again in behalf of the new owner. // If all those Pod updates collapse into a single event, the finalizer @@ -348,7 +341,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) { jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID)) } } - jm.enqueueControllerPodUpdate(logger, job, immediate) + jm.enqueueSyncJobBatched(logger, job) } } @@ -364,7 +357,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) { jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID)) } } - jm.enqueueControllerPodUpdate(logger, job, immediate) + jm.enqueueSyncJobBatched(logger, job) return } @@ -378,7 +371,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) { labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) if labelChanged || controllerRefChanged { for _, job := range jm.getPodJobs(curPod) { - jm.enqueueControllerPodUpdate(logger, job, immediate) + jm.enqueueSyncJobBatched(logger, job) } } } @@ -438,7 +431,7 @@ func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool) jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID)) } - jm.enqueueControllerPodUpdate(logger, job, true) + jm.enqueueSyncJobBatched(logger, job) } func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) { @@ -454,10 +447,11 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) { if curJob.Generation == oldJob.Generation { // Delay the Job sync when no generation change to batch Job status updates, // typically triggered by pod events. - jm.enqueueControllerPodUpdate(logger, curJob, true) + jm.enqueueSyncJobBatched(logger, curJob) } else { // Trigger immediate sync when spec is changed. - jm.enqueueController(logger, curJob, true) + jm.enqueueSyncJobImmediately(logger, curJob) + } // check if need to add a new rsync for ActiveDeadlineSeconds @@ -480,7 +474,7 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) { // deleteJob enqueues the job and all the pods associated with it that still // have a finalizer. func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) { - jm.enqueueController(logger, obj, true) + jm.enqueueSyncJobImmediately(logger, obj) jobObj, ok := obj.(*batch.Job) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) @@ -508,31 +502,41 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) { } } -// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item, -// immediate tells the controller to update the status right away, and should -// happen ONLY when there was a successful pod run. -func (jm *Controller) enqueueController(logger klog.Logger, obj interface{}, immediate bool) { - jm.enqueueControllerDelayed(logger, obj, immediate, 0) +// enqueueSyncJobImmediately tells the Job controller to invoke syncJob +// immediately. +// It is only used for Job events (creation, deletion, spec update). +// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. +func (jm *Controller) enqueueSyncJobImmediately(logger klog.Logger, obj interface{}) { + jm.enqueueSyncJobInternal(logger, obj, 0) } -func (jm *Controller) enqueueControllerPodUpdate(logger klog.Logger, obj interface{}, immediate bool) { - jm.enqueueControllerDelayed(logger, obj, immediate, jm.podUpdateBatchPeriod) +// enqueueSyncJobBatched tells the controller to invoke syncJob with a +// constant batching delay. +// It is used for: +// - Pod events (creation, deletion, update) +// - Job status update +// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. +func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{}) { + jm.enqueueSyncJobInternal(logger, obj, jm.syncJobBatchPeriod) } -func (jm *Controller) enqueueControllerDelayed(logger klog.Logger, obj interface{}, immediate bool, delay time.Duration) { +// enqueueSyncJobWithDelay tells the controller to invoke syncJob with a +// custom delay, but not smaller than the batching delay. +// It is used when pod recreations are delayed due to pod failures. +// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. +func (jm *Controller) enqueueSyncJobWithDelay(logger klog.Logger, obj interface{}, delay time.Duration) { + if delay < jm.syncJobBatchPeriod { + delay = jm.syncJobBatchPeriod + } + jm.enqueueSyncJobInternal(logger, obj, delay) +} + +func (jm *Controller) enqueueSyncJobInternal(logger klog.Logger, obj interface{}, delay time.Duration) { key, err := controller.KeyFunc(obj) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } - - backoff := delay - if !immediate { - if calculatedBackoff := getBackoff(jm.queue, key); calculatedBackoff > 0 { - backoff = calculatedBackoff - } - } - // TODO: Handle overlapping controllers better. Either disallow them at admission time or // deterministically avoid syncing controllers that fight over pods. Currently, we only // ensure that the same controller is synced for a given pod. When we periodically relist @@ -540,7 +544,7 @@ func (jm *Controller) enqueueControllerDelayed(logger klog.Logger, obj interface // by querying the store for all controllers that this rc overlaps, as well as all // controllers that overlap this rc, and sorting them. logger.Info("enqueueing job", "key", key) - jm.queue.AddAfter(key, backoff) + jm.queue.AddAfter(key, delay) } func (jm *Controller) enqueueOrphanPod(obj *v1.Pod) { @@ -711,7 +715,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { jm.expectations.DeleteExpectations(key) jm.finalizerExpectations.deleteExpectations(logger, key) - err := jm.backoffRecordStore.removeBackoffRecord(key) + err := jm.podBackoffStore.removeBackoffRecord(key) if err != nil { // re-syncing here as the record has to be removed for finished/deleted jobs return fmt.Errorf("error removing backoff record %w", err) @@ -725,7 +729,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { // if job was finished previously, we don't want to redo the termination if IsJobFinished(&job) { - err := jm.backoffRecordStore.removeBackoffRecord(key) + err := jm.podBackoffStore.removeBackoffRecord(key) if err != nil { // re-syncing here as the record has to be removed for finished/deleted jobs return fmt.Errorf("error removing backoff record %w", err) @@ -783,7 +787,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { job.Status.StartTime = &now } - newBackoffInfo := jm.backoffRecordStore.newBackoffRecord(key, newSucceededPods, newFailedPods) + newBackoffRecord := jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods) var manageJobErr error var finishedCondition *batch.JobCondition @@ -836,7 +840,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { } else { manageJobCalled := false if satisfiedExpectations && job.DeletionTimestamp == nil { - active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes, newBackoffInfo) + active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes, newBackoffRecord) manageJobCalled = true } complete := false @@ -892,14 +896,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready) job.Status.Active = active job.Status.Ready = ready - err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate, newBackoffInfo) + err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate, newBackoffRecord) if err != nil { - if apierrors.IsConflict(err) { - // we probably have a stale informer cache - // so don't return an error to avoid backoff - jm.enqueueController(logger, &job, false) - return nil - } return fmt.Errorf("tracking status: %w", err) } @@ -1120,7 +1118,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) } - err = jm.backoffRecordStore.updateBackoffRecord(newBackoffRecord) + err = jm.podBackoffStore.updateBackoffRecord(newBackoffRecord) if err != nil { // this error might undercount the backoff. @@ -1376,7 +1374,7 @@ func jobSuspended(job *batch.Job) bool { // pods according to what is specified in the job.Spec. // Respects back-off; does not create new pods if the back-off time has not passed // Does NOT modify . -func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, backoff backoffRecord) (int32, string, error) { +func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, newBackoffRecord backoffRecord) (int32, string, error) { logger := klog.FromContext(ctx) active := int32(len(activePods)) parallelism := *job.Spec.Parallelism @@ -1438,9 +1436,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods } if active < wantActive { - remainingTime := backoff.getRemainingTime(jm.clock, DefaultJobBackOff, MaxJobBackOff) + remainingTime := newBackoffRecord.getRemainingTime(jm.clock, DefaultJobBackOff, MaxJobBackOff) if remainingTime > 0 { - jm.enqueueControllerDelayed(logger, job, true, remainingTime) + jm.enqueueSyncJobWithDelay(logger, job, remainingTime) return 0, metrics.JobSyncActionPodsCreated, nil } diff := wantActive - active @@ -1569,26 +1567,6 @@ func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte) return err } -func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Duration { - exp := queue.NumRequeues(key) - - if exp <= 0 { - return time.Duration(0) - } - - // The backoff is capped such that 'calculated' value never overflows. - backoff := float64(DefaultJobBackOff.Nanoseconds()) * math.Pow(2, float64(exp-1)) - if backoff > math.MaxInt64 { - return MaxJobBackOff - } - - calculated := time.Duration(backoff) - if calculated > MaxJobBackOff { - return MaxJobBackOff - } - return calculated -} - // getValidPodsWithFilter returns the valid pods that pass the filter. // Pods are valid if they have a finalizer or in uncounted set // and, for Indexed Jobs, a valid completion index. diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 16d0aae465e..2431eb0d23c 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -821,7 +821,7 @@ func TestControllerSyncJob(t *testing.T) { if tc.backoffRecord != nil { tc.backoffRecord.key = key - manager.backoffRecordStore.updateBackoffRecord(*tc.backoffRecord) + manager.podBackoffStore.updateBackoffRecord(*tc.backoffRecord) } if tc.fakeExpectationAtCreation < 0 { manager.expectations.ExpectDeletions(key, int(-tc.fakeExpectationAtCreation)) @@ -3109,18 +3109,17 @@ func TestSyncJobUpdateRequeue(t *testing.T) { defer func() { DefaultJobBackOff = 10 * time.Second }() DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing cases := map[string]struct { - updateErr error - wantRequeue bool - withFinalizers bool + updateErr error + wantRequeuedImmediately bool }{ "no error": {}, "generic error": { - updateErr: fmt.Errorf("update error"), - wantRequeue: true, + updateErr: fmt.Errorf("update error"), + wantRequeuedImmediately: true, }, "conflict error": { - updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil), - wantRequeue: true, + updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil), + wantRequeuedImmediately: true, }, } for name, tc := range cases { @@ -3139,8 +3138,8 @@ func TestSyncJobUpdateRequeue(t *testing.T) { manager.processNextWorkItem(context.TODO()) // With DefaultJobBackOff=0, the queueing is synchronous. requeued := manager.queue.Len() > 0 - if requeued != tc.wantRequeue { - t.Errorf("Unexpected requeue, got %t, want %t", requeued, tc.wantRequeue) + if requeued != tc.wantRequeuedImmediately { + t.Errorf("Unexpected requeue, got %t, want %t", requeued, tc.wantRequeuedImmediately) } if requeued { key, _ := manager.queue.Get() @@ -3390,7 +3389,7 @@ func TestAddPod(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. - jm.podUpdateBatchPeriod = 0 + jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3438,7 +3437,7 @@ func TestAddPodOrphan(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. - jm.podUpdateBatchPeriod = 0 + jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3470,7 +3469,7 @@ func TestUpdatePod(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. - jm.podUpdateBatchPeriod = 0 + jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3522,7 +3521,7 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. - jm.podUpdateBatchPeriod = 0 + jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3553,7 +3552,7 @@ func TestUpdatePodChangeControllerRef(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. - jm.podUpdateBatchPeriod = 0 + jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3583,7 +3582,7 @@ func TestUpdatePodRelease(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. - jm.podUpdateBatchPeriod = 0 + jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3612,7 +3611,7 @@ func TestDeletePod(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. - jm.podUpdateBatchPeriod = 0 + jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3660,7 +3659,7 @@ func TestDeletePodOrphan(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. - jm.podUpdateBatchPeriod = 0 + jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -4013,58 +4012,16 @@ func TestJobBackoff(t *testing.T) { jobReadyPodsEnabled bool wantBackoff time.Duration }{ - "1st failure": { + "failure": { requeues: 0, phase: v1.PodFailed, - wantBackoff: 0, + wantBackoff: syncJobBatchPeriod, }, - "2nd failure": { - requeues: 1, - phase: v1.PodFailed, - wantBackoff: DefaultJobBackOff, - }, - "3rd failure": { - requeues: 2, - phase: v1.PodFailed, - wantBackoff: 2 * DefaultJobBackOff, - }, - "1st success": { - requeues: 0, - phase: v1.PodSucceeded, - wantBackoff: 0, - }, - "2nd success": { - requeues: 1, - phase: v1.PodSucceeded, - wantBackoff: 0, - }, - "1st running": { - requeues: 0, - phase: v1.PodSucceeded, - wantBackoff: 0, - }, - "2nd running": { - requeues: 1, - phase: v1.PodSucceeded, - wantBackoff: 0, - }, - "1st failure with pod updates batching": { + "failure with pod updates batching": { requeues: 0, phase: v1.PodFailed, jobReadyPodsEnabled: true, - wantBackoff: podUpdateBatchPeriod, - }, - "2nd failure with pod updates batching": { - requeues: 1, - phase: v1.PodFailed, - jobReadyPodsEnabled: true, - wantBackoff: DefaultJobBackOff, - }, - "Failed pod observed again": { - requeues: 1, - oldPodPhase: v1.PodFailed, - phase: v1.PodFailed, - wantBackoff: 0, + wantBackoff: syncJobBatchPeriod, }, } From 74c5ff97f187813ee6aad21769e99a42b079243c Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Fri, 16 Jun 2023 16:04:28 +0200 Subject: [PATCH 2/2] Lower the constants for the rate limiter in Job controller --- pkg/controller/job/job_controller.go | 18 +++++++++++------- pkg/controller/job/job_controller_test.go | 10 +++++----- test/integration/job/job_test.go | 22 +++++++++++----------- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index aad4c665dfb..46b41aec307 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -69,10 +69,14 @@ const ( var controllerKind = batch.SchemeGroupVersion.WithKind("Job") var ( - // DefaultJobBackOff is the default backoff period. Exported for tests. - DefaultJobBackOff = 10 * time.Second - // MaxJobBackOff is the max backoff period. Exported for tests. - MaxJobBackOff = 360 * time.Second + // DefaultJobApiBackOff is the default backoff period. Exported for tests. + DefaultJobApiBackOff = 1 * time.Second + // MaxJobApiBackOff is the max backoff period. Exported for tests. + MaxJobApiBackOff = 60 * time.Second + // DefaultJobPodFailureBackOff is the default backoff period. Exported for tests. + DefaultJobPodFailureBackOff = 10 * time.Second + // MaxJobPodFailureBackOff is the max backoff period. Exported for tests. + MaxJobPodFailureBackOff = 360 * time.Second // MaxUncountedPods is the maximum size the slices in // .status.uncountedTerminatedPods should have to keep their representation // roughly below 20 KB. Exported for tests @@ -148,8 +152,8 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn }, expectations: controller.NewControllerExpectations(), finalizerExpectations: newUIDTrackingExpectations(), - queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff)), - orphanQueue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job_orphan_pod"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff)), + queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobApiBackOff, MaxJobApiBackOff)), + orphanQueue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job_orphan_pod"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobApiBackOff, MaxJobApiBackOff)), broadcaster: eventBroadcaster, recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), clock: clock, @@ -1436,7 +1440,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods } if active < wantActive { - remainingTime := newBackoffRecord.getRemainingTime(jm.clock, DefaultJobBackOff, MaxJobBackOff) + remainingTime := newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff) if remainingTime > 0 { jm.enqueueSyncJobWithDelay(logger, job, remainingTime) return 0, metrics.JobSyncActionPodsCreated, nil diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 2431eb0d23c..17c738122d6 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -3106,8 +3106,8 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - defer func() { DefaultJobBackOff = 10 * time.Second }() - DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing + defer func() { DefaultJobApiBackOff = 1 * time.Second }() + DefaultJobApiBackOff = time.Duration(0) // overwrite the default value for testing cases := map[string]struct { updateErr error wantRequeuedImmediately bool @@ -3136,7 +3136,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) manager.queue.Add(testutil.GetKey(job, t)) manager.processNextWorkItem(context.TODO()) - // With DefaultJobBackOff=0, the queueing is synchronous. + // With DefaultJobApiBackOff=0, the queueing is synchronous. requeued := manager.queue.Len() > 0 if requeued != tc.wantRequeuedImmediately { t.Errorf("Unexpected requeue, got %t, want %t", requeued, tc.wantRequeuedImmediately) @@ -3934,8 +3934,8 @@ func TestJobBackoffReset(t *testing.T) { for name, tc := range testCases { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - defer func() { DefaultJobBackOff = 10 * time.Second }() - DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing + defer func() { DefaultJobApiBackOff = 1 * time.Second }() + DefaultJobApiBackOff = time.Duration(0) // overwrite the default value for testing manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 999e58b4ac7..532325e7178 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1379,9 +1379,9 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) { func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) { // overwrite the default value for faster testing - oldBackoff := jobcontroller.DefaultJobBackOff - defer func() { jobcontroller.DefaultJobBackOff = oldBackoff }() - jobcontroller.DefaultJobBackOff = 2 * time.Second + oldBackoff := jobcontroller.DefaultJobPodFailureBackOff + defer func() { jobcontroller.DefaultJobPodFailureBackOff = oldBackoff }() + jobcontroller.DefaultJobPodFailureBackOff = 2 * time.Second closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() @@ -1441,25 +1441,25 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) { return finishTime[i].Before(finishTime[j]) }) - if creationTime[1].Sub(finishTime[0]).Seconds() < jobcontroller.DefaultJobBackOff.Seconds() { - t.Fatalf("Second pod should be created at least %v seconds after the first pod", jobcontroller.DefaultJobBackOff) + if creationTime[1].Sub(finishTime[0]).Seconds() < jobcontroller.DefaultJobPodFailureBackOff.Seconds() { + t.Fatalf("Second pod should be created at least %v seconds after the first pod", jobcontroller.DefaultJobPodFailureBackOff) } - if creationTime[1].Sub(finishTime[0]).Seconds() >= 2*jobcontroller.DefaultJobBackOff.Seconds() { - t.Fatalf("Second pod should be created before %v seconds after the first pod", 2*jobcontroller.DefaultJobBackOff) + if creationTime[1].Sub(finishTime[0]).Seconds() >= 2*jobcontroller.DefaultJobPodFailureBackOff.Seconds() { + t.Fatalf("Second pod should be created before %v seconds after the first pod", 2*jobcontroller.DefaultJobPodFailureBackOff) } diff := creationTime[2].Sub(finishTime[1]).Seconds() // The third pod should not be created before 4 seconds - if diff < 2*jobcontroller.DefaultJobBackOff.Seconds() { - t.Fatalf("Third pod should be created at least %v seconds after the second pod", 2*jobcontroller.DefaultJobBackOff) + if diff < 2*jobcontroller.DefaultJobPodFailureBackOff.Seconds() { + t.Fatalf("Third pod should be created at least %v seconds after the second pod", 2*jobcontroller.DefaultJobPodFailureBackOff) } // The third pod should be created within 8 seconds // This check rules out double counting - if diff >= 4*jobcontroller.DefaultJobBackOff.Seconds() { - t.Fatalf("Third pod should be created before %v seconds after the second pod", 4*jobcontroller.DefaultJobBackOff) + if diff >= 4*jobcontroller.DefaultJobPodFailureBackOff.Seconds() { + t.Fatalf("Third pod should be created before %v seconds after the second pod", 4*jobcontroller.DefaultJobPodFailureBackOff) } }