mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-08 03:33:56 +00:00
Merge pull request #118470 from mimowo/job-controller-fix-delay
Ensure Job sync invocations are batched by 1s periods
This commit is contained in:
commit
a5332a839d
@ -440,7 +440,14 @@ func (jm *Controller) updateJob(old, cur interface{}) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
jm.enqueueController(curJob, true)
|
if curJob.Generation == oldJob.Generation {
|
||||||
|
// Delay the Job sync when no generation change to batch Job status updates,
|
||||||
|
// typically triggered by pod events.
|
||||||
|
jm.enqueueControllerPodUpdate(curJob, true)
|
||||||
|
} else {
|
||||||
|
// Trigger immediate sync when spec is changed.
|
||||||
|
jm.enqueueController(curJob, true)
|
||||||
|
}
|
||||||
// check if need to add a new rsync for ActiveDeadlineSeconds
|
// check if need to add a new rsync for ActiveDeadlineSeconds
|
||||||
if curJob.Status.StartTime != nil {
|
if curJob.Status.StartTime != nil {
|
||||||
curADS := curJob.Spec.ActiveDeadlineSeconds
|
curADS := curJob.Spec.ActiveDeadlineSeconds
|
||||||
|
@ -1894,10 +1894,6 @@ func TestPastDeadlineJobFinished(t *testing.T) {
|
|||||||
t.Errorf("Could not create Job: %v", err)
|
t.Errorf("Could not create Job: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job); err != nil {
|
|
||||||
t.Fatalf("Failed to insert job in index: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var j *batch.Job
|
var j *batch.Job
|
||||||
err = wait.PollImmediate(200*time.Microsecond, 3*time.Second, func() (done bool, err error) {
|
err = wait.PollImmediate(200*time.Microsecond, 3*time.Second, func() (done bool, err error) {
|
||||||
j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{})
|
j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{})
|
||||||
@ -1909,10 +1905,6 @@ func TestPastDeadlineJobFinished(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Job failed to ensure that start time was set: %v", err)
|
t.Errorf("Job failed to ensure that start time was set: %v", err)
|
||||||
}
|
}
|
||||||
// Make sure the start time is in the informer cache.
|
|
||||||
if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(j); err != nil {
|
|
||||||
t.Fatalf("Failed to update job in cache: %v", err)
|
|
||||||
}
|
|
||||||
err = wait.Poll(100*time.Millisecond, 3*time.Second, func() (done bool, err error) {
|
err = wait.Poll(100*time.Millisecond, 3*time.Second, func() (done bool, err error) {
|
||||||
j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{})
|
j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -3150,6 +3142,49 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUpdateJobRequeue(t *testing.T) {
|
||||||
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
|
cases := map[string]struct {
|
||||||
|
oldJob *batch.Job
|
||||||
|
updateFn func(job *batch.Job)
|
||||||
|
wantRequeuedImmediately bool
|
||||||
|
}{
|
||||||
|
"spec update": {
|
||||||
|
oldJob: newJob(1, 1, 1, batch.IndexedCompletion),
|
||||||
|
updateFn: func(job *batch.Job) {
|
||||||
|
job.Spec.Suspend = pointer.Bool(false)
|
||||||
|
job.Generation++
|
||||||
|
},
|
||||||
|
wantRequeuedImmediately: true,
|
||||||
|
},
|
||||||
|
"status update": {
|
||||||
|
oldJob: newJob(1, 1, 1, batch.IndexedCompletion),
|
||||||
|
updateFn: func(job *batch.Job) {
|
||||||
|
job.Status.StartTime = &metav1.Time{Time: time.Now()}
|
||||||
|
},
|
||||||
|
wantRequeuedImmediately: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for name, tc := range cases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||||
|
manager.podStoreSynced = alwaysReady
|
||||||
|
manager.jobStoreSynced = alwaysReady
|
||||||
|
|
||||||
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.oldJob)
|
||||||
|
newJob := tc.oldJob.DeepCopy()
|
||||||
|
if tc.updateFn != nil {
|
||||||
|
tc.updateFn(newJob)
|
||||||
|
}
|
||||||
|
manager.updateJob(tc.oldJob, newJob)
|
||||||
|
gotRequeuedImmediately := manager.queue.Len() > 0
|
||||||
|
if tc.wantRequeuedImmediately != gotRequeuedImmediately {
|
||||||
|
t.Fatalf("Want immediate requeue: %v, got immediate requeue: %v", tc.wantRequeuedImmediately, gotRequeuedImmediately)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestJobPodLookup(t *testing.T) {
|
func TestJobPodLookup(t *testing.T) {
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||||
|
Loading…
Reference in New Issue
Block a user