From 2f6b1d3c0fbd439ee9333751374535cc8f78e6ac Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 5 Jun 2023 18:32:14 +0200 Subject: [PATCH] Ensure Job sync invocations are batched by 1s periods --- pkg/controller/job/job_controller.go | 9 +++- pkg/controller/job/job_controller_test.go | 51 +++++++++++++++++++---- 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index da5e5e0f25c..28c97ee4260 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -440,7 +440,14 @@ func (jm *Controller) updateJob(old, cur interface{}) { if err != nil { return } - jm.enqueueController(curJob, true) + if curJob.Generation == oldJob.Generation { + // Delay the Job sync when no generation change to batch Job status updates, + // typically triggered by pod events. + jm.enqueueControllerPodUpdate(curJob, true) + } else { + // Trigger immediate sync when spec is changed. + jm.enqueueController(curJob, true) + } // check if need to add a new rsync for ActiveDeadlineSeconds if curJob.Status.StartTime != nil { curADS := curJob.Spec.ActiveDeadlineSeconds diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index ec9115c3a4d..0cf8e79abbf 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -1894,10 +1894,6 @@ func TestPastDeadlineJobFinished(t *testing.T) { t.Errorf("Could not create Job: %v", err) } - if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job); err != nil { - t.Fatalf("Failed to insert job in index: %v", err) - } - var j *batch.Job err = wait.PollImmediate(200*time.Microsecond, 3*time.Second, func() (done bool, err error) { j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{}) @@ -1909,10 +1905,6 @@ func TestPastDeadlineJobFinished(t *testing.T) { if err != nil { t.Errorf("Job failed to ensure that start time was set: %v", err) } - // Make sure the start time is in the informer cache. - if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(j); err != nil { - t.Fatalf("Failed to update job in cache: %v", err) - } err = wait.Poll(100*time.Millisecond, 3*time.Second, func() (done bool, err error) { j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{}) if err != nil { @@ -3150,6 +3142,49 @@ func TestSyncJobUpdateRequeue(t *testing.T) { } } +func TestUpdateJobRequeue(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + cases := map[string]struct { + oldJob *batch.Job + updateFn func(job *batch.Job) + wantRequeuedImmediately bool + }{ + "spec update": { + oldJob: newJob(1, 1, 1, batch.IndexedCompletion), + updateFn: func(job *batch.Job) { + job.Spec.Suspend = pointer.Bool(false) + job.Generation++ + }, + wantRequeuedImmediately: true, + }, + "status update": { + oldJob: newJob(1, 1, 1, batch.IndexedCompletion), + updateFn: func(job *batch.Job) { + job.Status.StartTime = &metav1.Time{Time: time.Now()} + }, + wantRequeuedImmediately: false, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady + + sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.oldJob) + newJob := tc.oldJob.DeepCopy() + if tc.updateFn != nil { + tc.updateFn(newJob) + } + manager.updateJob(tc.oldJob, newJob) + gotRequeuedImmediately := manager.queue.Len() > 0 + if tc.wantRequeuedImmediately != gotRequeuedImmediately { + t.Fatalf("Want immediate requeue: %v, got immediate requeue: %v", tc.wantRequeuedImmediately, gotRequeuedImmediately) + } + }) + } +} + func TestJobPodLookup(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)