From 68f2c892e5f0dffe3e51057b309267e3aa3362be Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Fri, 10 Sep 2021 12:56:46 -0400 Subject: [PATCH] Add integration tests for tracking ready Pods Change-Id: I1f20657f4f9cd4daad73149f969bad52a33698fa --- test/integration/job/job_test.go | 171 +++++++++++++++++++++++++------ 1 file changed, 142 insertions(+), 29 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index e0425898d36..d5cf1e592c4 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -40,6 +40,7 @@ import ( clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" featuregatetesting "k8s.io/component-base/featuregate/testing" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" jobcontroller "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" @@ -106,9 +107,26 @@ func TestNonParallelJob(t *testing.T) { } func TestParallelJob(t *testing.T) { - for _, wFinalizers := range []bool{false, true} { - t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) { - defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() + cases := map[string]struct { + trackWithFinalizers bool + enableReadyPods bool + }{ + "none": {}, + "with finalizers": { + trackWithFinalizers: true, + }, + "ready pods": { + enableReadyPods: true, + }, + "all": { + trackWithFinalizers: true, + enableReadyPods: true, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackWithFinalizers)() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)() closeFn, restConfig, clientSet, ns := setup(t, "parallel") defer closeFn() @@ -123,43 +141,71 @@ func TestParallelJob(t *testing.T) { if err != nil { t.Fatalf("Failed to create Job: %v", err) } - validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 5, - }, wFinalizers) + want := podsByStatus{Active: 5} + if tc.enableReadyPods { + want.Ready = pointer.Int32Ptr(0) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) + + // Tracks ready pods, if enabled. + if err := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil { + t.Fatalf("Failed Marking Pods as ready: %v", err) + } + if tc.enableReadyPods { + *want.Ready = 2 + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) + // Failed Pods are replaced. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } - validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + want = podsByStatus{ Active: 5, Failed: 2, - }, wFinalizers) + } + if tc.enableReadyPods { + want.Ready = pointer.Int32(0) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) // Once one Pod succeeds, no more Pods are created, even if some fail. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } - validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + want = podsByStatus{ Failed: 2, Succeeded: 1, Active: 4, - }, wFinalizers) + } + if tc.enableReadyPods { + want.Ready = pointer.Int32(0) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } - validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + want = podsByStatus{ Failed: 4, Succeeded: 1, Active: 2, - }, wFinalizers) + } + if tc.enableReadyPods { + want.Ready = pointer.Int32(0) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) // No more Pods are created after remaining Pods succeed. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err) } validateJobSucceeded(ctx, t, clientSet, jobObj) - validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + want = podsByStatus{ Failed: 4, Succeeded: 3, - }, false) + } + if tc.enableReadyPods { + want.Ready = pointer.Int32(0) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, want, false) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) }) } @@ -226,9 +272,26 @@ func TestParallelJobWithCompletions(t *testing.T) { // number of pods. t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10)) t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10)) - for _, wFinalizers := range []bool{false, true} { - t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) { - defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() + cases := map[string]struct { + trackWithFinalizers bool + enableReadyPods bool + }{ + "none": {}, + "with finalizers": { + trackWithFinalizers: true, + }, + "ready pods": { + enableReadyPods: true, + }, + "all": { + trackWithFinalizers: true, + enableReadyPods: true, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackWithFinalizers)() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)() closeFn, restConfig, clientSet, ns := setup(t, "completions") defer closeFn() ctx, cancel := startJobController(restConfig, clientSet) @@ -243,38 +306,62 @@ func TestParallelJobWithCompletions(t *testing.T) { if err != nil { t.Fatalf("Failed to create Job: %v", err) } - if got := hasJobTrackingAnnotation(jobObj); got != wFinalizers { - t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers) + if got := hasJobTrackingAnnotation(jobObj); got != tc.trackWithFinalizers { + t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, tc.trackWithFinalizers) } - validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 54, - }, wFinalizers) + want := podsByStatus{Active: 54} + if tc.enableReadyPods { + want.Ready = pointer.Int32Ptr(0) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) + + // Tracks ready pods, if enabled. + if err := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil { + t.Fatalf("Failed Marking Pods as ready: %v", err) + } + if tc.enableReadyPods { + want.Ready = pointer.Int32(52) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) + // Failed Pods are replaced. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } - validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + want = podsByStatus{ Active: 54, Failed: 2, - }, wFinalizers) + } + if tc.enableReadyPods { + want.Ready = pointer.Int32(50) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) // Pods are created until the number of succeeded Pods equals completions. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } - validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + want = podsByStatus{ Failed: 2, Succeeded: 53, Active: 3, - }, wFinalizers) + } + if tc.enableReadyPods { + want.Ready = pointer.Int32(0) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers) // No more Pods are created after the Job completes. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err) } validateJobSucceeded(ctx, t, clientSet, jobObj) - validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + want = podsByStatus{ Failed: 2, Succeeded: 56, - }, false) + } + if tc.enableReadyPods { + want.Ready = pointer.Int32(0) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, want, false) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) }) } @@ -707,6 +794,7 @@ func TestNodeSelectorUpdate(t *testing.T) { type podsByStatus struct { Active int + Ready *int32 Failed int Succeeded int } @@ -721,6 +809,7 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse } actualCounts = podsByStatus{ Active: int(updatedJob.Status.Active), + Ready: updatedJob.Status.Ready, Succeeded: int(updatedJob.Status.Succeeded), Failed: int(updatedJob.Status.Failed), } @@ -858,6 +947,28 @@ func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset } func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) error { + op := func(p *v1.Pod) bool { + p.Status.Phase = phase + return true + } + return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt) +} + +func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) error { + op := func(p *v1.Pod) bool { + if podutil.IsPodReady(p) { + return false + } + p.Status.Conditions = append(p.Status.Conditions, v1.PodCondition{ + Type: v1.PodReady, + Status: v1.ConditionTrue, + }) + return true + } + return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt) +} + +func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) error { pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { return fmt.Errorf("listing Job Pods: %w", err) @@ -868,7 +979,9 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj break } if p := pod.Status.Phase; isPodOwnedByJob(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded { - pod.Status.Phase = phase + if !op(&pod) { + continue + } updates = append(updates, pod) } }