diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 1da83969253..31d4727d897 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -56,8 +56,6 @@ import ( ) const ( - statusUpdateRetries = 3 - // maxUncountedPods is the maximum size the slices in // .status.uncountedTerminatedPods should have to keep their representation // roughly below 20 KB. @@ -82,7 +80,7 @@ type Controller struct { podControl controller.PodControlInterface // To allow injection of the following for testing. - updateStatusHandler func(job *batch.Job) error + updateStatusHandler func(job *batch.Job) (*batch.Job, error) patchJobHandler func(job *batch.Job, patch []byte) error syncHandler func(jobKey string) (bool, error) @@ -438,8 +436,13 @@ func (jm *Controller) processNextWorkItem() bool { return true } - utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err)) - jm.queue.AddRateLimited(key) + utilruntime.HandleError(fmt.Errorf("syncing job: %w", err)) + if !apierrors.IsConflict(err) { + // If this was a conflict error, we expect a Job or Pod update event, which + // will add the job back to the queue. Avoiding the rate limited requeue + // saves an unnecessary sync. + jm.queue.AddRateLimited(key) + } return true } @@ -754,7 +757,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { job.Status.Active = active err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, finishedCondition, needsStatusUpdate) if err != nil { - return false, err + return false, fmt.Errorf("tracking status: %w", err) } jobFinished := IsJobFinished(&job) if jobHasNewFailure && !jobFinished { @@ -782,7 +785,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { job.Status.UncountedTerminatedPods = nil jm.enactJobFinished(&job, finishedCondition) - if err := jm.updateStatusHandler(&job); err != nil { + if _, err := jm.updateStatusHandler(&job); err != nil { return forget, err } @@ -940,7 +943,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* job.Status.CompletedIndexes = succeededIndexes.String() } var err error - if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { + if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { return err } podsToRemoveFinalizer = nil @@ -953,14 +956,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* job.Status.CompletedIndexes = succeededIndexes.String() } var err error - if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { + if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { return err } if jm.enactJobFinished(job, finishedCond) { needsFlush = true } if needsFlush { - if err := jm.updateStatusHandler(job); err != nil { + if _, err := jm.updateStatusHandler(job); err != nil { return fmt.Errorf("removing uncounted pods from status: %w", err) } } @@ -976,10 +979,11 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* // 4. (if not all removals succeeded) flush Job status again. // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. -func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (bool, error) { +func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (*batch.Job, bool, error) { + var err error if needsFlush { - if err := jm.updateStatusHandler(job); err != nil { - return needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) + if job, err = jm.updateStatusHandler(job); err != nil { + return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) } needsFlush = false } @@ -999,11 +1003,11 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRe needsFlush = true } if rmErr != nil && needsFlush { - if err := jm.updateStatusHandler(job); err != nil { - return needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err) + if job, err := jm.updateStatusHandler(job); err != nil { + return job, needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err) } } - return needsFlush, rmErr + return job, needsFlush, rmErr } // cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from @@ -1354,22 +1358,9 @@ func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.P return rm } -func (jm *Controller) updateJobStatus(job *batch.Job) error { - jobClient := jm.kubeClient.BatchV1().Jobs(job.Namespace) - var err error - for i := 0; i <= statusUpdateRetries; i = i + 1 { - var newJob *batch.Job - newJob, err = jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{}) - if err != nil { - break - } - newJob.Status = job.Status - if _, err = jobClient.UpdateStatus(context.TODO(), newJob, metav1.UpdateOptions{}); err == nil { - break - } - } - - return err +// updateJobStatus calls the API to update the job status. +func (jm *Controller) updateJobStatus(job *batch.Job) (*batch.Job, error) { + return jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{}) } func (jm *Controller) patchJob(job *batch.Job, data []byte) error { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 995a5ea9ad6..de9da1548b3 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -29,6 +29,7 @@ import ( batch "k8s.io/api/batch/v1" "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -745,9 +746,9 @@ func TestControllerSyncJob(t *testing.T) { setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes) actual := job - manager.updateStatusHandler = func(job *batch.Job) error { + manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { actual = job - return nil + return job, nil } // run @@ -976,9 +977,9 @@ func TestSyncJobLegacyTracking(t *testing.T) { sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(&tc.job) var actual *batch.Job - manager.updateStatusHandler = func(job *batch.Job) error { + manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { actual = job - return nil + return job, nil } // Run. @@ -1527,9 +1528,9 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { fakePodControl := controller.FakePodControl{Err: tc.podControlErr} manager.podControl = &fakePodControl var statusUpdates []batch.JobStatus - manager.updateStatusHandler = func(job *batch.Job) error { + manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { statusUpdates = append(statusUpdates, *job.Status.DeepCopy()) - return tc.statusUpdateErr + return job, tc.statusUpdateErr } if tc.job.Status.UncountedTerminatedPods == nil { @@ -1654,9 +1655,9 @@ func TestSyncJobPastDeadline(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job - manager.updateStatusHandler = func(job *batch.Job) error { + manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { actual = job - return nil + return job, nil } // job & pods setup @@ -1731,9 +1732,9 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job - manager.updateStatusHandler = func(job *batch.Job) error { + manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { actual = job - return nil + return job, nil } job := newJob(1, 1, 6, batch.NonIndexedCompletion) @@ -1796,7 +1797,9 @@ func TestSyncJobDeleted(t *testing.T) { manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady - manager.updateStatusHandler = func(job *batch.Job) error { return nil } + manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + return job, nil + } job := newJob(2, 2, 6, batch.NonIndexedCompletion) forget, err := manager.syncJob(testutil.GetKey(job, t)) if err != nil { @@ -1816,30 +1819,52 @@ func TestSyncJobDeleted(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) - fakePodControl := controller.FakePodControl{} - manager.podControl = &fakePodControl - manager.podStoreSynced = alwaysReady - manager.jobStoreSynced = alwaysReady - updateError := fmt.Errorf("update error") - manager.updateStatusHandler = func(job *batch.Job) error { - manager.queue.AddRateLimited(testutil.GetKey(job, t)) - return updateError + cases := map[string]struct { + updateErr error + wantRequeue bool + withFinalizers bool + }{ + "no error": {}, + "generic error": { + updateErr: fmt.Errorf("update error"), + wantRequeue: true, + }, + "conflict error": { + updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil), + }, + "conflict error, with finalizers": { + withFinalizers: true, + updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil), + }, } - job := newJob(2, 2, 6, batch.NonIndexedCompletion) - sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - forget, err := manager.syncJob(testutil.GetKey(job, t)) - if err == nil || err != updateError { - t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err) - } - if forget != false { - t.Errorf("Unexpected forget value. Expected %v, saw %v\n", false, forget) - } - t.Log("Waiting for a job in the queue") - key, _ := manager.queue.Get() - expectedKey := testutil.GetKey(job, t) - if key != expectedKey { - t.Errorf("Expected requeue of job with key %s got %s", expectedKey, key) + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.withFinalizers)() + manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady + manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + return job, tc.updateErr + } + job := newJob(2, 2, 6, batch.NonIndexedCompletion) + sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) + manager.queue.Add(testutil.GetKey(job, t)) + manager.processNextWorkItem() + // With DefaultJobBackOff=0, the queueing is synchronous. + requeued := manager.queue.Len() > 0 + if requeued != tc.wantRequeue { + t.Errorf("Unexpected requeue, got %t, want %t", requeued, tc.wantRequeue) + } + if requeued { + key, _ := manager.queue.Get() + expectedKey := testutil.GetKey(job, t) + if key != expectedKey { + t.Errorf("Expected requeue of job with key %s got %s", expectedKey, key) + } + } + }) } } @@ -2323,7 +2348,9 @@ func TestSyncJobExpectations(t *testing.T) { manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady - manager.updateStatusHandler = func(job *batch.Job) error { return nil } + manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + return job, nil + } job := newJob(2, 2, 6, batch.NonIndexedCompletion) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) @@ -2527,9 +2554,9 @@ func TestJobBackoffReset(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job - manager.updateStatusHandler = func(job *batch.Job) error { + manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { actual = job - return nil + return job, nil } // job & pods setup @@ -2707,9 +2734,9 @@ func TestJobBackoffForOnFailure(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job - manager.updateStatusHandler = func(job *batch.Job) error { + manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { actual = job - return nil + return job, nil } // job & pods setup @@ -2809,9 +2836,9 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job - manager.updateStatusHandler = func(job *batch.Job) error { + manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { actual = job - return nil + return job, nil } // job & pods setup