diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 0aa340e20ef..11fbe6d94a9 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -55,22 +55,21 @@ import ( "k8s.io/utils/integer" ) -const ( - // maxUncountedPods is the maximum size the slices in - // .status.uncountedTerminatedPods should have to keep their representation - // roughly below 20 KB. - maxUncountedPods = 500 - maxPodCreateDeletePerSync = 500 -) - // controllerKind contains the schema.GroupVersionKind for this controller type. var controllerKind = batch.SchemeGroupVersion.WithKind("Job") var ( - // DefaultJobBackOff is the default backoff period, exported for the e2e test + // DefaultJobBackOff is the default backoff period. Exported for tests. DefaultJobBackOff = 10 * time.Second - // MaxJobBackOff is the max backoff period, exported for the e2e test + // MaxJobBackOff is the max backoff period. Exported for tests. MaxJobBackOff = 360 * time.Second + // MaxUncountedPods is the maximum size the slices in + // .status.uncountedTerminatedPods should have to keep their representation + // roughly below 20 KB. Exported for tests + MaxUncountedPods = 500 + // MaxPodCreateDeletePerSync is the maximum number of pods that can be + // created or deleted in a single sync call. Exported for tests. + MaxPodCreateDeletePerSync = 500 ) // Controller ensures that all Job objects have corresponding pods to @@ -938,7 +937,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID) } } - if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { + if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= MaxUncountedPods { // The controller added enough Pods already to .status.uncountedTerminatedPods // We stop counting pods and removing finalizers here to: // 1. Ensure that the UIDs representation are under 20 KB. @@ -1230,8 +1229,8 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded rmAtLeast = 0 } podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast)) - if len(podsToDelete) > maxPodCreateDeletePerSync { - podsToDelete = podsToDelete[:maxPodCreateDeletePerSync] + if len(podsToDelete) > MaxPodCreateDeletePerSync { + podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync] } if len(podsToDelete) > 0 { jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) @@ -1247,8 +1246,8 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded if active < wantActive { diff := wantActive - active - if diff > int32(maxPodCreateDeletePerSync) { - diff = int32(maxPodCreateDeletePerSync) + if diff > int32(MaxPodCreateDeletePerSync) { + diff = int32(MaxPodCreateDeletePerSync) } jm.expectations.ExpectCreations(jobKey, int(diff)) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 5b2e4866b3b..0a599fb4043 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "strconv" + "sync" "testing" "time" @@ -220,6 +221,10 @@ func TestParallelJobParallelism(t *testing.T) { } func TestParallelJobWithCompletions(t *testing.T) { + // Lower limits for a job sync so that we can test partial updates with a low + // number of pods. + t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10)) + t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10)) for _, wFinalizers := range []bool{false, true} { t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() @@ -230,8 +235,8 @@ func TestParallelJobWithCompletions(t *testing.T) { jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ - Parallelism: pointer.Int32Ptr(504), - Completions: pointer.Int32Ptr(506), + Parallelism: pointer.Int32Ptr(54), + Completions: pointer.Int32Ptr(56), }, }) if err != nil { @@ -241,23 +246,23 @@ func TestParallelJobWithCompletions(t *testing.T) { t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 504, + Active: 54, }, wFinalizers) // Failed Pods are replaced. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 504, + Active: 54, Failed: 2, }, wFinalizers) // Pods are created until the number of succeeded Pods equals completions. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 503); err != nil { + if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 503, + Succeeded: 53, Active: 3, }, wFinalizers) // No more Pods are created after the Job completes. @@ -267,7 +272,7 @@ func TestParallelJobWithCompletions(t *testing.T) { validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 506, + Succeeded: 56, }, false) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) }) @@ -781,22 +786,44 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj if err != nil { return fmt.Errorf("listing Job Pods: %w", err) } + updates := make([]v1.Pod, 0, cnt) for _, pod := range pods.Items { - if cnt == 0 { + if len(updates) == cnt { break } if p := pod.Status.Phase; isPodOwnedByJob(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded { pod.Status.Phase = phase - _, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("updating Pod status: %w", err) - } - cnt-- + updates = append(updates, pod) } } - if cnt != 0 { + if len(updates) != cnt { return fmt.Errorf("couldn't set phase on %d Job Pods", cnt) } + return updatePodStatuses(ctx, clientSet, updates) +} + +func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) error { + wg := sync.WaitGroup{} + wg.Add(len(updates)) + errCh := make(chan error, len(updates)) + + for _, pod := range updates { + pod := pod + go func() { + _, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{}) + if err != nil { + errCh <- err + } + wg.Done() + }() + } + wg.Wait() + + select { + case err := <-errCh: + return fmt.Errorf("updating Pod status: %w", err) + default: + } return nil } @@ -903,3 +930,11 @@ func hasJobTrackingAnnotation(job *batchv1.Job) bool { _, ok := job.Annotations[batchv1.JobTrackingFinalizer] return ok } + +func setDuringTest(val *int, newVal int) func() { + origVal := *val + *val = newVal + return func() { + *val = origVal + } +}