Ensure Job sync invocations are batched by 1s periods

This commit is contained in:
Michal Wozniak 2023-06-05 18:32:14 +02:00
parent 16cbdef00a
commit 2f6b1d3c0f
2 changed files with 51 additions and 9 deletions

View File

@ -440,7 +440,14 @@ func (jm *Controller) updateJob(old, cur interface{}) {
if err != nil {
return
}
jm.enqueueController(curJob, true)
if curJob.Generation == oldJob.Generation {
// Delay the Job sync when no generation change to batch Job status updates,
// typically triggered by pod events.
jm.enqueueControllerPodUpdate(curJob, true)
} else {
// Trigger immediate sync when spec is changed.
jm.enqueueController(curJob, true)
}
// check if need to add a new rsync for ActiveDeadlineSeconds
if curJob.Status.StartTime != nil {
curADS := curJob.Spec.ActiveDeadlineSeconds

View File

@ -1894,10 +1894,6 @@ func TestPastDeadlineJobFinished(t *testing.T) {
t.Errorf("Could not create Job: %v", err)
}
if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job); err != nil {
t.Fatalf("Failed to insert job in index: %v", err)
}
var j *batch.Job
err = wait.PollImmediate(200*time.Microsecond, 3*time.Second, func() (done bool, err error) {
j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{})
@ -1909,10 +1905,6 @@ func TestPastDeadlineJobFinished(t *testing.T) {
if err != nil {
t.Errorf("Job failed to ensure that start time was set: %v", err)
}
// Make sure the start time is in the informer cache.
if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(j); err != nil {
t.Fatalf("Failed to update job in cache: %v", err)
}
err = wait.Poll(100*time.Millisecond, 3*time.Second, func() (done bool, err error) {
j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{})
if err != nil {
@ -3150,6 +3142,49 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
}
}
func TestUpdateJobRequeue(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
cases := map[string]struct {
oldJob *batch.Job
updateFn func(job *batch.Job)
wantRequeuedImmediately bool
}{
"spec update": {
oldJob: newJob(1, 1, 1, batch.IndexedCompletion),
updateFn: func(job *batch.Job) {
job.Spec.Suspend = pointer.Bool(false)
job.Generation++
},
wantRequeuedImmediately: true,
},
"status update": {
oldJob: newJob(1, 1, 1, batch.IndexedCompletion),
updateFn: func(job *batch.Job) {
job.Status.StartTime = &metav1.Time{Time: time.Now()}
},
wantRequeuedImmediately: false,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.oldJob)
newJob := tc.oldJob.DeepCopy()
if tc.updateFn != nil {
tc.updateFn(newJob)
}
manager.updateJob(tc.oldJob, newJob)
gotRequeuedImmediately := manager.queue.Len() > 0
if tc.wantRequeuedImmediately != gotRequeuedImmediately {
t.Fatalf("Want immediate requeue: %v, got immediate requeue: %v", tc.wantRequeuedImmediately, gotRequeuedImmediately)
}
})
}
}
func TestJobPodLookup(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)