Parallelize pod updates in job test

To potentially reduce the number of job controller syncs.

Also reduce the maximum number of pods to sync in tests.
This commit is contained in:
Aldo Culquicondor 2021-09-22 13:39:43 -04:00
parent a438f16741
commit 95c2a8024c
2 changed files with 63 additions and 29 deletions

View File

@ -55,22 +55,21 @@ import (
"k8s.io/utils/integer"
)
const (
// maxUncountedPods is the maximum size the slices in
// .status.uncountedTerminatedPods should have to keep their representation
// roughly below 20 KB.
maxUncountedPods = 500
maxPodCreateDeletePerSync = 500
)
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
var (
// DefaultJobBackOff is the default backoff period, exported for the e2e test
// DefaultJobBackOff is the default backoff period. Exported for tests.
DefaultJobBackOff = 10 * time.Second
// MaxJobBackOff is the max backoff period, exported for the e2e test
// MaxJobBackOff is the max backoff period. Exported for tests.
MaxJobBackOff = 360 * time.Second
// MaxUncountedPods is the maximum size the slices in
// .status.uncountedTerminatedPods should have to keep their representation
// roughly below 20 KB. Exported for tests
MaxUncountedPods = 500
// MaxPodCreateDeletePerSync is the maximum number of pods that can be
// created or deleted in a single sync call. Exported for tests.
MaxPodCreateDeletePerSync = 500
)
// Controller ensures that all Job objects have corresponding pods to
@ -938,7 +937,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID)
}
}
if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods {
if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= MaxUncountedPods {
// The controller added enough Pods already to .status.uncountedTerminatedPods
// We stop counting pods and removing finalizers here to:
// 1. Ensure that the UIDs representation are under 20 KB.
@ -1230,8 +1229,8 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
rmAtLeast = 0
}
podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast))
if len(podsToDelete) > maxPodCreateDeletePerSync {
podsToDelete = podsToDelete[:maxPodCreateDeletePerSync]
if len(podsToDelete) > MaxPodCreateDeletePerSync {
podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync]
}
if len(podsToDelete) > 0 {
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
@ -1247,8 +1246,8 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
if active < wantActive {
diff := wantActive - active
if diff > int32(maxPodCreateDeletePerSync) {
diff = int32(maxPodCreateDeletePerSync)
if diff > int32(MaxPodCreateDeletePerSync) {
diff = int32(MaxPodCreateDeletePerSync)
}
jm.expectations.ExpectCreations(jobKey, int(diff))

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"testing"
"time"
@ -220,6 +221,10 @@ func TestParallelJobParallelism(t *testing.T) {
}
func TestParallelJobWithCompletions(t *testing.T) {
// Lower limits for a job sync so that we can test partial updates with a low
// number of pods.
t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10))
t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10))
for _, wFinalizers := range []bool{false, true} {
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
@ -230,8 +235,8 @@ func TestParallelJobWithCompletions(t *testing.T) {
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(504),
Completions: pointer.Int32Ptr(506),
Parallelism: pointer.Int32Ptr(54),
Completions: pointer.Int32Ptr(56),
},
})
if err != nil {
@ -241,23 +246,23 @@ func TestParallelJobWithCompletions(t *testing.T) {
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 504,
Active: 54,
}, wFinalizers)
// Failed Pods are replaced.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 504,
Active: 54,
Failed: 2,
}, wFinalizers)
// Pods are created until the number of succeeded Pods equals completions.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 503); err != nil {
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 2,
Succeeded: 503,
Succeeded: 53,
Active: 3,
}, wFinalizers)
// No more Pods are created after the Job completes.
@ -267,7 +272,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
validateJobSucceeded(ctx, t, clientSet, jobObj)
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 2,
Succeeded: 506,
Succeeded: 56,
}, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
})
@ -781,22 +786,44 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj
if err != nil {
return fmt.Errorf("listing Job Pods: %w", err)
}
updates := make([]v1.Pod, 0, cnt)
for _, pod := range pods.Items {
if cnt == 0 {
if len(updates) == cnt {
break
}
if p := pod.Status.Phase; isPodOwnedByJob(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded {
pod.Status.Phase = phase
_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("updating Pod status: %w", err)
}
cnt--
updates = append(updates, pod)
}
}
if cnt != 0 {
if len(updates) != cnt {
return fmt.Errorf("couldn't set phase on %d Job Pods", cnt)
}
return updatePodStatuses(ctx, clientSet, updates)
}
func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) error {
wg := sync.WaitGroup{}
wg.Add(len(updates))
errCh := make(chan error, len(updates))
for _, pod := range updates {
pod := pod
go func() {
_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
if err != nil {
errCh <- err
}
wg.Done()
}()
}
wg.Wait()
select {
case err := <-errCh:
return fmt.Errorf("updating Pod status: %w", err)
default:
}
return nil
}
@ -903,3 +930,11 @@ func hasJobTrackingAnnotation(job *batchv1.Job) bool {
_, ok := job.Annotations[batchv1.JobTrackingFinalizer]
return ok
}
func setDuringTest(val *int, newVal int) func() {
origVal := *val
*val = newVal
return func() {
*val = origVal
}
}