From 8ed23558b44bed8ce60f4171ea0a87739221110c Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Tue, 20 Jun 2023 18:04:53 +0200 Subject: [PATCH] Do not set jm.syncJobBatchPeriod=0 if not needed --- pkg/controller/job/job_controller.go | 28 ++-- pkg/controller/job/job_controller_test.go | 178 ++++++++++++---------- 2 files changed, 111 insertions(+), 95 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index e7b6ece74a7..888cb7de6b1 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -56,9 +56,6 @@ import ( "k8s.io/utils/pointer" ) -// syncJobBatchPeriod is the batch period for controller sync invocations for a Job. -const syncJobBatchPeriod = time.Second - const ( // PodFailurePolicy reason indicates a job failure condition is added due to // a failed pod matching a pod failure policy rule @@ -69,14 +66,16 @@ const ( var controllerKind = batch.SchemeGroupVersion.WithKind("Job") var ( - // DefaultJobApiBackOff is the default backoff period. Exported for tests. - DefaultJobApiBackOff = 1 * time.Second - // MaxJobApiBackOff is the max backoff period. Exported for tests. - MaxJobApiBackOff = 60 * time.Second - // DefaultJobPodFailureBackOff is the default backoff period. Exported for tests. + // syncJobBatchPeriod is the batch period for controller sync invocations for a Job. + syncJobBatchPeriod = time.Second + // DefaultJobApiBackOff is the default API backoff period. Exported for tests. + DefaultJobApiBackOff = time.Second + // MaxJobApiBackOff is the max API backoff period. Exported for tests. + MaxJobApiBackOff = time.Minute + // DefaultJobPodFailureBackOff is the default pod failure backoff period. Exported for tests. DefaultJobPodFailureBackOff = 10 * time.Second - // MaxJobPodFailureBackOff is the max backoff period. Exported for tests. - MaxJobPodFailureBackOff = 360 * time.Second + // MaxJobPodFailureBackOff is the max pod failure backoff period. Exported for tests. + MaxJobPodFailureBackOff = 10 * time.Minute // MaxUncountedPods is the maximum size the slices in // .status.uncountedTerminatedPods should have to keep their representation // roughly below 20 KB. Exported for tests @@ -125,8 +124,6 @@ type Controller struct { broadcaster record.EventBroadcaster recorder record.EventRecorder - syncJobBatchPeriod time.Duration - clock clock.WithTicker // Store with information to compute the expotential backoff delay for pod @@ -159,7 +156,6 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn clock: clock, podBackoffStore: newBackoffStore(), } - jm.syncJobBatchPeriod = syncJobBatchPeriod jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -521,7 +517,7 @@ func (jm *Controller) enqueueSyncJobImmediately(logger klog.Logger, obj interfac // - Job status update // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{}) { - jm.enqueueSyncJobInternal(logger, obj, jm.syncJobBatchPeriod) + jm.enqueueSyncJobInternal(logger, obj, syncJobBatchPeriod) } // enqueueSyncJobWithDelay tells the controller to invoke syncJob with a @@ -529,8 +525,8 @@ func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{}) // It is used when pod recreations are delayed due to pod failures. // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. func (jm *Controller) enqueueSyncJobWithDelay(logger klog.Logger, obj interface{}, delay time.Duration) { - if delay < jm.syncJobBatchPeriod { - delay = jm.syncJobBatchPeriod + if delay < syncJobBatchPeriod { + delay = syncJobBatchPeriod } jm.enqueueSyncJobInternal(logger, obj, delay) } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index cd023a31b04..e583e466563 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -64,6 +64,10 @@ import ( var realClock = &clock.RealClock{} var alwaysReady = func() bool { return true } +const fastSyncJobBatchPeriod = 10 * time.Millisecond +const fastJobApiBackoff = 10 * time.Millisecond +const fastRequeue = 10 * time.Millisecond + // testFinishedAt represents time one second later than unix epoch // this will be used in various test cases where we don't want back-off to kick in var testFinishedAt = metav1.NewTime((time.Time{}).Add(time.Second)) @@ -3096,25 +3100,27 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - defer func() { DefaultJobApiBackOff = 1 * time.Second }() - DefaultJobApiBackOff = time.Duration(0) // overwrite the default value for testing cases := map[string]struct { - updateErr error - wantRequeuedImmediately bool + updateErr error + wantRequeued bool }{ - "no error": {}, + "no error": { + wantRequeued: false, + }, "generic error": { - updateErr: fmt.Errorf("update error"), - wantRequeuedImmediately: true, + updateErr: fmt.Errorf("update error"), + wantRequeued: true, }, "conflict error": { - updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil), - wantRequeuedImmediately: true, + updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil), + wantRequeued: true, }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + t.Cleanup(setDurationDuringTest(&DefaultJobApiBackOff, fastJobApiBackoff)) + fakeClient := clocktesting.NewFakeClock(time.Now()) + manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClient) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -3126,17 +3132,16 @@ func TestSyncJobUpdateRequeue(t *testing.T) { sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) manager.queue.Add(testutil.GetKey(job, t)) manager.processNextWorkItem(context.TODO()) - // With DefaultJobApiBackOff=0, the queueing is synchronous. - requeued := manager.queue.Len() > 0 - if requeued != tc.wantRequeuedImmediately { - t.Errorf("Unexpected requeue, got %t, want %t", requeued, tc.wantRequeuedImmediately) - } - if requeued { - key, _ := manager.queue.Get() - expectedKey := testutil.GetKey(job, t) - if key != expectedKey { - t.Errorf("Expected requeue of job with key %s got %s", expectedKey, key) - } + if tc.wantRequeued { + verifyEmptyQueueAndAwaitForQueueLen(ctx, t, manager, 1) + } else { + // We advance the clock to make sure there are not items awaiting + // to be added into the queue. We also sleep a little to give the + // delaying queue time to move the potential items from pre-queue + // into the queue asynchronously. + manager.clock.Sleep(fastJobApiBackoff) + time.Sleep(time.Millisecond) + verifyEmptyQueue(ctx, t, manager) } }) } @@ -3371,15 +3376,15 @@ func TestGetPodsForJob(t *testing.T) { } func TestAddPod(t *testing.T) { + t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) _, ctx := ktesting.NewTestContext(t) logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + fakeClock := clocktesting.NewFakeClock(time.Now()) + jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - // Disable batching of pod updates. - jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3394,9 +3399,7 @@ func TestAddPod(t *testing.T) { informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) jm.addPod(logger, pod1) - if got, want := jm.queue.Len(), 1; got != want { - t.Fatalf("queue.Len() = %v, want %v", got, want) - } + verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) key, done := jm.queue.Get() if key == nil || done { t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) @@ -3407,9 +3410,7 @@ func TestAddPod(t *testing.T) { } jm.addPod(logger, pod2) - if got, want := jm.queue.Len(), 1; got != want { - t.Fatalf("queue.Len() = %v, want %v", got, want) - } + verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) key, done = jm.queue.Get() if key == nil || done { t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) @@ -3421,13 +3422,13 @@ func TestAddPod(t *testing.T) { } func TestAddPodOrphan(t *testing.T) { + t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + fakeClock := clocktesting.NewFakeClock(time.Now()) + jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - // Disable batching of pod updates. - jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3446,20 +3447,18 @@ func TestAddPodOrphan(t *testing.T) { informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) jm.addPod(logger, pod1) - if got, want := jm.queue.Len(), 2; got != want { - t.Fatalf("queue.Len() = %v, want %v", got, want) - } + verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2) } func TestUpdatePod(t *testing.T) { + t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) _, ctx := ktesting.NewTestContext(t) logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + fakeClock := clocktesting.NewFakeClock(time.Now()) + jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - // Disable batching of pod updates. - jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3476,9 +3475,7 @@ func TestUpdatePod(t *testing.T) { prev := *pod1 bumpResourceVersion(pod1) jm.updatePod(logger, &prev, pod1) - if got, want := jm.queue.Len(), 1; got != want { - t.Fatalf("queue.Len() = %v, want %v", got, want) - } + verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) key, done := jm.queue.Get() if key == nil || done { t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) @@ -3491,9 +3488,7 @@ func TestUpdatePod(t *testing.T) { prev = *pod2 bumpResourceVersion(pod2) jm.updatePod(logger, &prev, pod2) - if got, want := jm.queue.Len(), 1; got != want { - t.Fatalf("queue.Len() = %v, want %v", got, want) - } + verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) key, done = jm.queue.Get() if key == nil || done { t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) @@ -3505,13 +3500,13 @@ func TestUpdatePod(t *testing.T) { } func TestUpdatePodOrphanWithNewLabels(t *testing.T) { + t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + fakeClock := clocktesting.NewFakeClock(time.Now()) + jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - // Disable batching of pod updates. - jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3529,20 +3524,18 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) { prev.Labels = map[string]string{"foo2": "bar2"} bumpResourceVersion(pod1) jm.updatePod(logger, &prev, pod1) - if got, want := jm.queue.Len(), 2; got != want { - t.Fatalf("queue.Len() = %v, want %v", got, want) - } + verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2) } func TestUpdatePodChangeControllerRef(t *testing.T) { + t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) _, ctx := ktesting.NewTestContext(t) logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + fakeClock := clocktesting.NewFakeClock(time.Now()) + jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - // Disable batching of pod updates. - jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3559,20 +3552,18 @@ func TestUpdatePodChangeControllerRef(t *testing.T) { prev.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(job2, controllerKind)} bumpResourceVersion(pod1) jm.updatePod(logger, &prev, pod1) - if got, want := jm.queue.Len(), 2; got != want { - t.Fatalf("queue.Len() = %v, want %v", got, want) - } + verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2) } func TestUpdatePodRelease(t *testing.T) { + t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) _, ctx := ktesting.NewTestContext(t) logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + fakeClock := clocktesting.NewFakeClock(time.Now()) + jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - // Disable batching of pod updates. - jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3589,19 +3580,17 @@ func TestUpdatePodRelease(t *testing.T) { pod1.OwnerReferences = nil bumpResourceVersion(pod1) jm.updatePod(logger, &prev, pod1) - if got, want := jm.queue.Len(), 2; got != want { - t.Fatalf("queue.Len() = %v, want %v", got, want) - } + verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2) } func TestDeletePod(t *testing.T) { + t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod)) logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + fakeClock := clocktesting.NewFakeClock(time.Now()) + jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - // Disable batching of pod updates. - jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3616,9 +3605,7 @@ func TestDeletePod(t *testing.T) { informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) jm.deletePod(logger, pod1, true) - if got, want := jm.queue.Len(), 1; got != want { - t.Fatalf("queue.Len() = %v, want %v", got, want) - } + verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) key, done := jm.queue.Get() if key == nil || done { t.Fatalf("failed to enqueue controller for pod %v", pod1.Name) @@ -3629,9 +3616,7 @@ func TestDeletePod(t *testing.T) { } jm.deletePod(logger, pod2, true) - if got, want := jm.queue.Len(), 1; got != want { - t.Fatalf("queue.Len() = %v, want %v", got, want) - } + verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1) key, done = jm.queue.Get() if key == nil || done { t.Fatalf("failed to enqueue controller for pod %v", pod2.Name) @@ -3643,13 +3628,13 @@ func TestDeletePod(t *testing.T) { } func TestDeletePodOrphan(t *testing.T) { + // Disable batching of pod updates to show it does not get requeued at all + t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, 0)) logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - // Disable batching of pod updates. - jm.syncJobBatchPeriod = 0 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion) job1.Name = "job1" @@ -3889,10 +3874,12 @@ func bumpResourceVersion(obj metav1.Object) { } func TestJobApiBackoffReset(t *testing.T) { + t.Cleanup(setDurationDuringTest(&DefaultJobApiBackOff, fastJobApiBackoff)) _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + fakeClock := clocktesting.NewFakeClock(time.Now()) + manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -3916,11 +3903,9 @@ func TestJobApiBackoffReset(t *testing.T) { // the queue is emptied on success fakePodControl.Err = nil + manager.clock.Sleep(fastJobApiBackoff) manager.processNextWorkItem(context.TODO()) - retries = manager.queue.NumRequeues(key) - if retries != 0 { - t.Fatalf("%s: expected exactly 0 retries, got %d", job.Name, retries) - } + verifyEmptyQueue(ctx, t, manager) } var _ workqueue.RateLimitingInterface = &fakeRateLimitingQueue{} @@ -4434,6 +4419,33 @@ func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) { } } +func verifyEmptyQueueAndAwaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) { + t.Helper() + verifyEmptyQueue(ctx, t, jm) + awaitForQueueLen(ctx, t, jm, wantQueueLen) +} + +func awaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) { + t.Helper() + verifyEmptyQueue(ctx, t, jm) + if err := wait.PollUntilContextTimeout(ctx, fastRequeue, time.Second, true, func(ctx context.Context) (bool, error) { + if requeued := jm.queue.Len() == wantQueueLen; requeued { + return true, nil + } + jm.clock.Sleep(fastRequeue) + return false, nil + }); err != nil { + t.Errorf("Failed to await for expected queue.Len(). want %v, got: %v", wantQueueLen, jm.queue.Len()) + } +} + +func verifyEmptyQueue(ctx context.Context, t *testing.T, jm *Controller) { + t.Helper() + if jm.queue.Len() > 0 { + t.Errorf("Unexpected queue.Len(). Want: %d, got: %d", 0, jm.queue.Len()) + } +} + type podBuilder struct { *v1.Pod } @@ -4515,3 +4527,11 @@ func (pb podBuilder) deletionTimestamp() podBuilder { pb.DeletionTimestamp = &metav1.Time{} return pb } + +func setDurationDuringTest(val *time.Duration, newVal time.Duration) func() { + origVal := *val + *val = newVal + return func() { + *val = origVal + } +}