Merge pull request #118615 from mimowo/job-controller-backoff-cleanup

Cleanup job controller handling of backoff
This commit is contained in:
Kubernetes Prow Robot 2023-06-16 08:58:19 -07:00 committed by GitHub
commit cef13f11fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 104 additions and 165 deletions

View File

@ -75,7 +75,7 @@ func (s *backoffStore) removeBackoffRecord(jobId string) error {
} }
func newBackoffRecordStore() *backoffStore { func newBackoffStore() *backoffStore {
return &backoffStore{ return &backoffStore{
store: cache.NewStore(backoffRecordKeyFunc), store: cache.NewStore(backoffRecordKeyFunc),
} }

View File

@ -148,7 +148,7 @@ func TestNewBackoffRecord(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
backoffRecordStore := newBackoffRecordStore() backoffRecordStore := newBackoffStore()
tc.storeInitializer(backoffRecordStore) tc.storeInitializer(backoffRecordStore)
newSucceededPods := []*v1.Pod{} newSucceededPods := []*v1.Pod{}

View File

@ -19,7 +19,6 @@ package job
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"reflect" "reflect"
"sort" "sort"
"sync" "sync"
@ -57,9 +56,8 @@ import (
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
) )
// podUpdateBatchPeriod is the batch period to hold pod updates before syncing // syncJobBatchPeriod is the batch period for controller sync invocations for a Job.
// a Job. It is used if the feature gate JobReadyPods is enabled. const syncJobBatchPeriod = time.Second
const podUpdateBatchPeriod = time.Second
const ( const (
// PodFailurePolicy reason indicates a job failure condition is added due to // PodFailurePolicy reason indicates a job failure condition is added due to
@ -71,10 +69,14 @@ const (
var controllerKind = batch.SchemeGroupVersion.WithKind("Job") var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
var ( var (
// DefaultJobBackOff is the default backoff period. Exported for tests. // DefaultJobApiBackOff is the default backoff period. Exported for tests.
DefaultJobBackOff = 10 * time.Second DefaultJobApiBackOff = 1 * time.Second
// MaxJobBackOff is the max backoff period. Exported for tests. // MaxJobApiBackOff is the max backoff period. Exported for tests.
MaxJobBackOff = 360 * time.Second MaxJobApiBackOff = 60 * time.Second
// DefaultJobPodFailureBackOff is the default backoff period. Exported for tests.
DefaultJobPodFailureBackOff = 10 * time.Second
// MaxJobPodFailureBackOff is the max backoff period. Exported for tests.
MaxJobPodFailureBackOff = 360 * time.Second
// MaxUncountedPods is the maximum size the slices in // MaxUncountedPods is the maximum size the slices in
// .status.uncountedTerminatedPods should have to keep their representation // .status.uncountedTerminatedPods should have to keep their representation
// roughly below 20 KB. Exported for tests // roughly below 20 KB. Exported for tests
@ -123,11 +125,13 @@ type Controller struct {
broadcaster record.EventBroadcaster broadcaster record.EventBroadcaster
recorder record.EventRecorder recorder record.EventRecorder
podUpdateBatchPeriod time.Duration syncJobBatchPeriod time.Duration
clock clock.WithTicker clock clock.WithTicker
backoffRecordStore *backoffStore // Store with information to compute the expotential backoff delay for pod
// recreation in case of pod failures.
podBackoffStore *backoffStore
} }
// NewController creates a new Job controller that keeps the relevant pods // NewController creates a new Job controller that keeps the relevant pods
@ -148,20 +152,18 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
}, },
expectations: controller.NewControllerExpectations(), expectations: controller.NewControllerExpectations(),
finalizerExpectations: newUIDTrackingExpectations(), finalizerExpectations: newUIDTrackingExpectations(),
queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff)), queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobApiBackOff, MaxJobApiBackOff)),
orphanQueue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job_orphan_pod"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff)), orphanQueue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job_orphan_pod"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobApiBackOff, MaxJobApiBackOff)),
broadcaster: eventBroadcaster, broadcaster: eventBroadcaster,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
clock: clock, clock: clock,
backoffRecordStore: newBackoffRecordStore(), podBackoffStore: newBackoffStore(),
}
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
jm.podUpdateBatchPeriod = podUpdateBatchPeriod
} }
jm.syncJobBatchPeriod = syncJobBatchPeriod
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
jm.enqueueController(logger, obj, true) jm.enqueueSyncJobImmediately(logger, obj)
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
jm.updateJob(logger, oldObj, newObj) jm.updateJob(logger, oldObj, newObj)
@ -286,7 +288,7 @@ func (jm *Controller) addPod(logger klog.Logger, obj interface{}) {
return return
} }
jm.expectations.CreationObserved(jobKey) jm.expectations.CreationObserved(jobKey)
jm.enqueueControllerPodUpdate(logger, job, true) jm.enqueueSyncJobBatched(logger, job)
return return
} }
@ -300,7 +302,7 @@ func (jm *Controller) addPod(logger klog.Logger, obj interface{}) {
// DO NOT observe creation because no controller should be waiting for an // DO NOT observe creation because no controller should be waiting for an
// orphan. // orphan.
for _, job := range jm.getPodJobs(pod) { for _, job := range jm.getPodJobs(pod) {
jm.enqueueControllerPodUpdate(logger, job, true) jm.enqueueSyncJobBatched(logger, job)
} }
} }
@ -325,11 +327,6 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
return return
} }
// the only time we want the backoff to kick-in, is when the pod failed for the first time.
// we don't want to re-calculate backoff for an update event when the tracking finalizer
// for a failed pod is removed.
immediate := !(curPod.Status.Phase == v1.PodFailed && oldPod.Status.Phase != v1.PodFailed)
// Don't check if oldPod has the finalizer, as during ownership transfer // Don't check if oldPod has the finalizer, as during ownership transfer
// finalizers might be re-added and removed again in behalf of the new owner. // finalizers might be re-added and removed again in behalf of the new owner.
// If all those Pod updates collapse into a single event, the finalizer // If all those Pod updates collapse into a single event, the finalizer
@ -348,7 +345,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID)) jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
} }
} }
jm.enqueueControllerPodUpdate(logger, job, immediate) jm.enqueueSyncJobBatched(logger, job)
} }
} }
@ -364,7 +361,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID)) jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
} }
} }
jm.enqueueControllerPodUpdate(logger, job, immediate) jm.enqueueSyncJobBatched(logger, job)
return return
} }
@ -378,7 +375,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if labelChanged || controllerRefChanged { if labelChanged || controllerRefChanged {
for _, job := range jm.getPodJobs(curPod) { for _, job := range jm.getPodJobs(curPod) {
jm.enqueueControllerPodUpdate(logger, job, immediate) jm.enqueueSyncJobBatched(logger, job)
} }
} }
} }
@ -438,7 +435,7 @@ func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool)
jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID)) jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID))
} }
jm.enqueueControllerPodUpdate(logger, job, true) jm.enqueueSyncJobBatched(logger, job)
} }
func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) { func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
@ -454,10 +451,11 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
if curJob.Generation == oldJob.Generation { if curJob.Generation == oldJob.Generation {
// Delay the Job sync when no generation change to batch Job status updates, // Delay the Job sync when no generation change to batch Job status updates,
// typically triggered by pod events. // typically triggered by pod events.
jm.enqueueControllerPodUpdate(logger, curJob, true) jm.enqueueSyncJobBatched(logger, curJob)
} else { } else {
// Trigger immediate sync when spec is changed. // Trigger immediate sync when spec is changed.
jm.enqueueController(logger, curJob, true) jm.enqueueSyncJobImmediately(logger, curJob)
} }
// check if need to add a new rsync for ActiveDeadlineSeconds // check if need to add a new rsync for ActiveDeadlineSeconds
@ -480,7 +478,7 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
// deleteJob enqueues the job and all the pods associated with it that still // deleteJob enqueues the job and all the pods associated with it that still
// have a finalizer. // have a finalizer.
func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) { func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
jm.enqueueController(logger, obj, true) jm.enqueueSyncJobImmediately(logger, obj)
jobObj, ok := obj.(*batch.Job) jobObj, ok := obj.(*batch.Job)
if !ok { if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown) tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
@ -508,31 +506,41 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
} }
} }
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item, // enqueueSyncJobImmediately tells the Job controller to invoke syncJob
// immediate tells the controller to update the status right away, and should // immediately.
// happen ONLY when there was a successful pod run. // It is only used for Job events (creation, deletion, spec update).
func (jm *Controller) enqueueController(logger klog.Logger, obj interface{}, immediate bool) { // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
jm.enqueueControllerDelayed(logger, obj, immediate, 0) func (jm *Controller) enqueueSyncJobImmediately(logger klog.Logger, obj interface{}) {
jm.enqueueSyncJobInternal(logger, obj, 0)
} }
func (jm *Controller) enqueueControllerPodUpdate(logger klog.Logger, obj interface{}, immediate bool) { // enqueueSyncJobBatched tells the controller to invoke syncJob with a
jm.enqueueControllerDelayed(logger, obj, immediate, jm.podUpdateBatchPeriod) // constant batching delay.
// It is used for:
// - Pod events (creation, deletion, update)
// - Job status update
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{}) {
jm.enqueueSyncJobInternal(logger, obj, jm.syncJobBatchPeriod)
} }
func (jm *Controller) enqueueControllerDelayed(logger klog.Logger, obj interface{}, immediate bool, delay time.Duration) { // enqueueSyncJobWithDelay tells the controller to invoke syncJob with a
// custom delay, but not smaller than the batching delay.
// It is used when pod recreations are delayed due to pod failures.
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
func (jm *Controller) enqueueSyncJobWithDelay(logger klog.Logger, obj interface{}, delay time.Duration) {
if delay < jm.syncJobBatchPeriod {
delay = jm.syncJobBatchPeriod
}
jm.enqueueSyncJobInternal(logger, obj, delay)
}
func (jm *Controller) enqueueSyncJobInternal(logger klog.Logger, obj interface{}, delay time.Duration) {
key, err := controller.KeyFunc(obj) key, err := controller.KeyFunc(obj)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return return
} }
backoff := delay
if !immediate {
if calculatedBackoff := getBackoff(jm.queue, key); calculatedBackoff > 0 {
backoff = calculatedBackoff
}
}
// TODO: Handle overlapping controllers better. Either disallow them at admission time or // TODO: Handle overlapping controllers better. Either disallow them at admission time or
// deterministically avoid syncing controllers that fight over pods. Currently, we only // deterministically avoid syncing controllers that fight over pods. Currently, we only
// ensure that the same controller is synced for a given pod. When we periodically relist // ensure that the same controller is synced for a given pod. When we periodically relist
@ -540,7 +548,7 @@ func (jm *Controller) enqueueControllerDelayed(logger klog.Logger, obj interface
// by querying the store for all controllers that this rc overlaps, as well as all // by querying the store for all controllers that this rc overlaps, as well as all
// controllers that overlap this rc, and sorting them. // controllers that overlap this rc, and sorting them.
logger.Info("enqueueing job", "key", key) logger.Info("enqueueing job", "key", key)
jm.queue.AddAfter(key, backoff) jm.queue.AddAfter(key, delay)
} }
func (jm *Controller) enqueueOrphanPod(obj *v1.Pod) { func (jm *Controller) enqueueOrphanPod(obj *v1.Pod) {
@ -711,7 +719,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
jm.expectations.DeleteExpectations(key) jm.expectations.DeleteExpectations(key)
jm.finalizerExpectations.deleteExpectations(logger, key) jm.finalizerExpectations.deleteExpectations(logger, key)
err := jm.backoffRecordStore.removeBackoffRecord(key) err := jm.podBackoffStore.removeBackoffRecord(key)
if err != nil { if err != nil {
// re-syncing here as the record has to be removed for finished/deleted jobs // re-syncing here as the record has to be removed for finished/deleted jobs
return fmt.Errorf("error removing backoff record %w", err) return fmt.Errorf("error removing backoff record %w", err)
@ -725,7 +733,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
// if job was finished previously, we don't want to redo the termination // if job was finished previously, we don't want to redo the termination
if IsJobFinished(&job) { if IsJobFinished(&job) {
err := jm.backoffRecordStore.removeBackoffRecord(key) err := jm.podBackoffStore.removeBackoffRecord(key)
if err != nil { if err != nil {
// re-syncing here as the record has to be removed for finished/deleted jobs // re-syncing here as the record has to be removed for finished/deleted jobs
return fmt.Errorf("error removing backoff record %w", err) return fmt.Errorf("error removing backoff record %w", err)
@ -783,7 +791,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
job.Status.StartTime = &now job.Status.StartTime = &now
} }
newBackoffInfo := jm.backoffRecordStore.newBackoffRecord(key, newSucceededPods, newFailedPods) newBackoffRecord := jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods)
var manageJobErr error var manageJobErr error
var finishedCondition *batch.JobCondition var finishedCondition *batch.JobCondition
@ -836,7 +844,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
} else { } else {
manageJobCalled := false manageJobCalled := false
if satisfiedExpectations && job.DeletionTimestamp == nil { if satisfiedExpectations && job.DeletionTimestamp == nil {
active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes, newBackoffInfo) active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes, newBackoffRecord)
manageJobCalled = true manageJobCalled = true
} }
complete := false complete := false
@ -892,14 +900,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready) needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready)
job.Status.Active = active job.Status.Active = active
job.Status.Ready = ready job.Status.Ready = ready
err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate, newBackoffInfo) err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate, newBackoffRecord)
if err != nil { if err != nil {
if apierrors.IsConflict(err) {
// we probably have a stale informer cache
// so don't return an error to avoid backoff
jm.enqueueController(logger, &job, false)
return nil
}
return fmt.Errorf("tracking status: %w", err) return fmt.Errorf("tracking status: %w", err)
} }
@ -1120,7 +1122,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job
return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
} }
err = jm.backoffRecordStore.updateBackoffRecord(newBackoffRecord) err = jm.podBackoffStore.updateBackoffRecord(newBackoffRecord)
if err != nil { if err != nil {
// this error might undercount the backoff. // this error might undercount the backoff.
@ -1376,7 +1378,7 @@ func jobSuspended(job *batch.Job) bool {
// pods according to what is specified in the job.Spec. // pods according to what is specified in the job.Spec.
// Respects back-off; does not create new pods if the back-off time has not passed // Respects back-off; does not create new pods if the back-off time has not passed
// Does NOT modify <activePods>. // Does NOT modify <activePods>.
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, backoff backoffRecord) (int32, string, error) { func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, newBackoffRecord backoffRecord) (int32, string, error) {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
active := int32(len(activePods)) active := int32(len(activePods))
parallelism := *job.Spec.Parallelism parallelism := *job.Spec.Parallelism
@ -1438,9 +1440,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
} }
if active < wantActive { if active < wantActive {
remainingTime := backoff.getRemainingTime(jm.clock, DefaultJobBackOff, MaxJobBackOff) remainingTime := newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff)
if remainingTime > 0 { if remainingTime > 0 {
jm.enqueueControllerDelayed(logger, job, true, remainingTime) jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
return 0, metrics.JobSyncActionPodsCreated, nil return 0, metrics.JobSyncActionPodsCreated, nil
} }
diff := wantActive - active diff := wantActive - active
@ -1569,26 +1571,6 @@ func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte)
return err return err
} }
func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Duration {
exp := queue.NumRequeues(key)
if exp <= 0 {
return time.Duration(0)
}
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(DefaultJobBackOff.Nanoseconds()) * math.Pow(2, float64(exp-1))
if backoff > math.MaxInt64 {
return MaxJobBackOff
}
calculated := time.Duration(backoff)
if calculated > MaxJobBackOff {
return MaxJobBackOff
}
return calculated
}
// getValidPodsWithFilter returns the valid pods that pass the filter. // getValidPodsWithFilter returns the valid pods that pass the filter.
// Pods are valid if they have a finalizer or in uncounted set // Pods are valid if they have a finalizer or in uncounted set
// and, for Indexed Jobs, a valid completion index. // and, for Indexed Jobs, a valid completion index.

View File

@ -821,7 +821,7 @@ func TestControllerSyncJob(t *testing.T) {
if tc.backoffRecord != nil { if tc.backoffRecord != nil {
tc.backoffRecord.key = key tc.backoffRecord.key = key
manager.backoffRecordStore.updateBackoffRecord(*tc.backoffRecord) manager.podBackoffStore.updateBackoffRecord(*tc.backoffRecord)
} }
if tc.fakeExpectationAtCreation < 0 { if tc.fakeExpectationAtCreation < 0 {
manager.expectations.ExpectDeletions(key, int(-tc.fakeExpectationAtCreation)) manager.expectations.ExpectDeletions(key, int(-tc.fakeExpectationAtCreation))
@ -3106,21 +3106,20 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
func TestSyncJobUpdateRequeue(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
defer func() { DefaultJobBackOff = 10 * time.Second }() defer func() { DefaultJobApiBackOff = 1 * time.Second }()
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing DefaultJobApiBackOff = time.Duration(0) // overwrite the default value for testing
cases := map[string]struct { cases := map[string]struct {
updateErr error updateErr error
wantRequeue bool wantRequeuedImmediately bool
withFinalizers bool
}{ }{
"no error": {}, "no error": {},
"generic error": { "generic error": {
updateErr: fmt.Errorf("update error"), updateErr: fmt.Errorf("update error"),
wantRequeue: true, wantRequeuedImmediately: true,
}, },
"conflict error": { "conflict error": {
updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil), updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil),
wantRequeue: true, wantRequeuedImmediately: true,
}, },
} }
for name, tc := range cases { for name, tc := range cases {
@ -3137,10 +3136,10 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
manager.queue.Add(testutil.GetKey(job, t)) manager.queue.Add(testutil.GetKey(job, t))
manager.processNextWorkItem(context.TODO()) manager.processNextWorkItem(context.TODO())
// With DefaultJobBackOff=0, the queueing is synchronous. // With DefaultJobApiBackOff=0, the queueing is synchronous.
requeued := manager.queue.Len() > 0 requeued := manager.queue.Len() > 0
if requeued != tc.wantRequeue { if requeued != tc.wantRequeuedImmediately {
t.Errorf("Unexpected requeue, got %t, want %t", requeued, tc.wantRequeue) t.Errorf("Unexpected requeue, got %t, want %t", requeued, tc.wantRequeuedImmediately)
} }
if requeued { if requeued {
key, _ := manager.queue.Get() key, _ := manager.queue.Get()
@ -3390,7 +3389,7 @@ func TestAddPod(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates. // Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0 jm.syncJobBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
@ -3438,7 +3437,7 @@ func TestAddPodOrphan(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates. // Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0 jm.syncJobBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
@ -3470,7 +3469,7 @@ func TestUpdatePod(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates. // Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0 jm.syncJobBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
@ -3522,7 +3521,7 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates. // Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0 jm.syncJobBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
@ -3553,7 +3552,7 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates. // Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0 jm.syncJobBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
@ -3583,7 +3582,7 @@ func TestUpdatePodRelease(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates. // Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0 jm.syncJobBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
@ -3612,7 +3611,7 @@ func TestDeletePod(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates. // Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0 jm.syncJobBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
@ -3660,7 +3659,7 @@ func TestDeletePodOrphan(t *testing.T) {
jm.podStoreSynced = alwaysReady jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates. // Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0 jm.syncJobBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1" job1.Name = "job1"
@ -3935,8 +3934,8 @@ func TestJobBackoffReset(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
defer func() { DefaultJobBackOff = 10 * time.Second }() defer func() { DefaultJobApiBackOff = 1 * time.Second }()
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing DefaultJobApiBackOff = time.Duration(0) // overwrite the default value for testing
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
@ -4013,58 +4012,16 @@ func TestJobBackoff(t *testing.T) {
jobReadyPodsEnabled bool jobReadyPodsEnabled bool
wantBackoff time.Duration wantBackoff time.Duration
}{ }{
"1st failure": { "failure": {
requeues: 0, requeues: 0,
phase: v1.PodFailed, phase: v1.PodFailed,
wantBackoff: 0, wantBackoff: syncJobBatchPeriod,
}, },
"2nd failure": { "failure with pod updates batching": {
requeues: 1,
phase: v1.PodFailed,
wantBackoff: DefaultJobBackOff,
},
"3rd failure": {
requeues: 2,
phase: v1.PodFailed,
wantBackoff: 2 * DefaultJobBackOff,
},
"1st success": {
requeues: 0,
phase: v1.PodSucceeded,
wantBackoff: 0,
},
"2nd success": {
requeues: 1,
phase: v1.PodSucceeded,
wantBackoff: 0,
},
"1st running": {
requeues: 0,
phase: v1.PodSucceeded,
wantBackoff: 0,
},
"2nd running": {
requeues: 1,
phase: v1.PodSucceeded,
wantBackoff: 0,
},
"1st failure with pod updates batching": {
requeues: 0, requeues: 0,
phase: v1.PodFailed, phase: v1.PodFailed,
jobReadyPodsEnabled: true, jobReadyPodsEnabled: true,
wantBackoff: podUpdateBatchPeriod, wantBackoff: syncJobBatchPeriod,
},
"2nd failure with pod updates batching": {
requeues: 1,
phase: v1.PodFailed,
jobReadyPodsEnabled: true,
wantBackoff: DefaultJobBackOff,
},
"Failed pod observed again": {
requeues: 1,
oldPodPhase: v1.PodFailed,
phase: v1.PodFailed,
wantBackoff: 0,
}, },
} }

View File

@ -1379,9 +1379,9 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) { func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
// overwrite the default value for faster testing // overwrite the default value for faster testing
oldBackoff := jobcontroller.DefaultJobBackOff oldBackoff := jobcontroller.DefaultJobPodFailureBackOff
defer func() { jobcontroller.DefaultJobBackOff = oldBackoff }() defer func() { jobcontroller.DefaultJobPodFailureBackOff = oldBackoff }()
jobcontroller.DefaultJobBackOff = 2 * time.Second jobcontroller.DefaultJobPodFailureBackOff = 2 * time.Second
closeFn, restConfig, clientSet, ns := setup(t, "simple") closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn() defer closeFn()
@ -1441,25 +1441,25 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
return finishTime[i].Before(finishTime[j]) return finishTime[i].Before(finishTime[j])
}) })
if creationTime[1].Sub(finishTime[0]).Seconds() < jobcontroller.DefaultJobBackOff.Seconds() { if creationTime[1].Sub(finishTime[0]).Seconds() < jobcontroller.DefaultJobPodFailureBackOff.Seconds() {
t.Fatalf("Second pod should be created at least %v seconds after the first pod", jobcontroller.DefaultJobBackOff) t.Fatalf("Second pod should be created at least %v seconds after the first pod", jobcontroller.DefaultJobPodFailureBackOff)
} }
if creationTime[1].Sub(finishTime[0]).Seconds() >= 2*jobcontroller.DefaultJobBackOff.Seconds() { if creationTime[1].Sub(finishTime[0]).Seconds() >= 2*jobcontroller.DefaultJobPodFailureBackOff.Seconds() {
t.Fatalf("Second pod should be created before %v seconds after the first pod", 2*jobcontroller.DefaultJobBackOff) t.Fatalf("Second pod should be created before %v seconds after the first pod", 2*jobcontroller.DefaultJobPodFailureBackOff)
} }
diff := creationTime[2].Sub(finishTime[1]).Seconds() diff := creationTime[2].Sub(finishTime[1]).Seconds()
// The third pod should not be created before 4 seconds // The third pod should not be created before 4 seconds
if diff < 2*jobcontroller.DefaultJobBackOff.Seconds() { if diff < 2*jobcontroller.DefaultJobPodFailureBackOff.Seconds() {
t.Fatalf("Third pod should be created at least %v seconds after the second pod", 2*jobcontroller.DefaultJobBackOff) t.Fatalf("Third pod should be created at least %v seconds after the second pod", 2*jobcontroller.DefaultJobPodFailureBackOff)
} }
// The third pod should be created within 8 seconds // The third pod should be created within 8 seconds
// This check rules out double counting // This check rules out double counting
if diff >= 4*jobcontroller.DefaultJobBackOff.Seconds() { if diff >= 4*jobcontroller.DefaultJobPodFailureBackOff.Seconds() {
t.Fatalf("Third pod should be created before %v seconds after the second pod", 4*jobcontroller.DefaultJobBackOff) t.Fatalf("Third pod should be created before %v seconds after the second pod", 4*jobcontroller.DefaultJobPodFailureBackOff)
} }
} }