Remove GET job and retries for status updates.

Doing a GET right before retrying has 2 problems:
- It can masquerade conflicts
- It adds an additional delay

As for retries, we are better of going through the sync backoff.

In the case of conflict, we know that there was a Job update that would trigger another sync, so there is no need to do a rate limited requeue.
This commit is contained in:
Aldo Culquicondor
2021-09-23 11:42:30 -04:00
parent 2541fcf256
commit eebd678cda
2 changed files with 91 additions and 73 deletions

View File

@@ -56,8 +56,6 @@ import (
)
const (
statusUpdateRetries = 3
// maxUncountedPods is the maximum size the slices in
// .status.uncountedTerminatedPods should have to keep their representation
// roughly below 20 KB.
@@ -82,7 +80,7 @@ type Controller struct {
podControl controller.PodControlInterface
// To allow injection of the following for testing.
updateStatusHandler func(job *batch.Job) error
updateStatusHandler func(job *batch.Job) (*batch.Job, error)
patchJobHandler func(job *batch.Job, patch []byte) error
syncHandler func(jobKey string) (bool, error)
@@ -438,8 +436,13 @@ func (jm *Controller) processNextWorkItem() bool {
return true
}
utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err))
jm.queue.AddRateLimited(key)
utilruntime.HandleError(fmt.Errorf("syncing job: %w", err))
if !apierrors.IsConflict(err) {
// If this was a conflict error, we expect a Job or Pod update event, which
// will add the job back to the queue. Avoiding the rate limited requeue
// saves an unnecessary sync.
jm.queue.AddRateLimited(key)
}
return true
}
@@ -754,7 +757,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
job.Status.Active = active
err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, finishedCondition, needsStatusUpdate)
if err != nil {
return false, err
return false, fmt.Errorf("tracking status: %w", err)
}
jobFinished := IsJobFinished(&job)
if jobHasNewFailure && !jobFinished {
@@ -782,7 +785,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
job.Status.UncountedTerminatedPods = nil
jm.enactJobFinished(&job, finishedCondition)
if err := jm.updateStatusHandler(&job); err != nil {
if _, err := jm.updateStatusHandler(&job); err != nil {
return forget, err
}
@@ -940,7 +943,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
job.Status.CompletedIndexes = succeededIndexes.String()
}
var err error
if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil {
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil {
return err
}
podsToRemoveFinalizer = nil
@@ -953,14 +956,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
job.Status.CompletedIndexes = succeededIndexes.String()
}
var err error
if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil {
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil {
return err
}
if jm.enactJobFinished(job, finishedCond) {
needsFlush = true
}
if needsFlush {
if err := jm.updateStatusHandler(job); err != nil {
if _, err := jm.updateStatusHandler(job); err != nil {
return fmt.Errorf("removing uncounted pods from status: %w", err)
}
}
@@ -976,10 +979,11 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
// 4. (if not all removals succeeded) flush Job status again.
// Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (bool, error) {
func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (*batch.Job, bool, error) {
var err error
if needsFlush {
if err := jm.updateStatusHandler(job); err != nil {
return needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
if job, err = jm.updateStatusHandler(job); err != nil {
return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
}
needsFlush = false
}
@@ -999,11 +1003,11 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRe
needsFlush = true
}
if rmErr != nil && needsFlush {
if err := jm.updateStatusHandler(job); err != nil {
return needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err)
if job, err := jm.updateStatusHandler(job); err != nil {
return job, needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err)
}
}
return needsFlush, rmErr
return job, needsFlush, rmErr
}
// cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from
@@ -1354,22 +1358,9 @@ func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.P
return rm
}
func (jm *Controller) updateJobStatus(job *batch.Job) error {
jobClient := jm.kubeClient.BatchV1().Jobs(job.Namespace)
var err error
for i := 0; i <= statusUpdateRetries; i = i + 1 {
var newJob *batch.Job
newJob, err = jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{})
if err != nil {
break
}
newJob.Status = job.Status
if _, err = jobClient.UpdateStatus(context.TODO(), newJob, metav1.UpdateOptions{}); err == nil {
break
}
}
return err
// updateJobStatus calls the API to update the job status.
func (jm *Controller) updateJobStatus(job *batch.Job) (*batch.Job, error) {
return jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
}
func (jm *Controller) patchJob(job *batch.Job, data []byte) error {