Add integration tests for tracking ready Pods

Change-Id: I1f20657f4f9cd4daad73149f969bad52a33698fa
This commit is contained in:
Aldo Culquicondor 2021-09-10 12:56:46 -04:00
parent 60fc90967b
commit 68f2c892e5

View File

@ -40,6 +40,7 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
jobcontroller "k8s.io/kubernetes/pkg/controller/job" jobcontroller "k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
@ -106,9 +107,26 @@ func TestNonParallelJob(t *testing.T) {
} }
func TestParallelJob(t *testing.T) { func TestParallelJob(t *testing.T) {
for _, wFinalizers := range []bool{false, true} { cases := map[string]struct {
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) { trackWithFinalizers bool
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() enableReadyPods bool
}{
"none": {},
"with finalizers": {
trackWithFinalizers: true,
},
"ready pods": {
enableReadyPods: true,
},
"all": {
trackWithFinalizers: true,
enableReadyPods: true,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackWithFinalizers)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)()
closeFn, restConfig, clientSet, ns := setup(t, "parallel") closeFn, restConfig, clientSet, ns := setup(t, "parallel")
defer closeFn() defer closeFn()
@ -123,43 +141,71 @@ func TestParallelJob(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed to create Job: %v", err) t.Fatalf("Failed to create Job: %v", err)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ want := podsByStatus{Active: 5}
Active: 5, if tc.enableReadyPods {
}, wFinalizers) want.Ready = pointer.Int32Ptr(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
// Tracks ready pods, if enabled.
if err := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
t.Fatalf("Failed Marking Pods as ready: %v", err)
}
if tc.enableReadyPods {
*want.Ready = 2
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
// Failed Pods are replaced. // Failed Pods are replaced.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ want = podsByStatus{
Active: 5, Active: 5,
Failed: 2, Failed: 2,
}, wFinalizers) }
if tc.enableReadyPods {
want.Ready = pointer.Int32(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
// Once one Pod succeeds, no more Pods are created, even if some fail. // Once one Pod succeeds, no more Pods are created, even if some fail.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ want = podsByStatus{
Failed: 2, Failed: 2,
Succeeded: 1, Succeeded: 1,
Active: 4, Active: 4,
}, wFinalizers) }
if tc.enableReadyPods {
want.Ready = pointer.Int32(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ want = podsByStatus{
Failed: 4, Failed: 4,
Succeeded: 1, Succeeded: 1,
Active: 2, Active: 2,
}, wFinalizers) }
if tc.enableReadyPods {
want.Ready = pointer.Int32(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
// No more Pods are created after remaining Pods succeed. // No more Pods are created after remaining Pods succeed.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err) t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
} }
validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobSucceeded(ctx, t, clientSet, jobObj)
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ want = podsByStatus{
Failed: 4, Failed: 4,
Succeeded: 3, Succeeded: 3,
}, false) }
if tc.enableReadyPods {
want.Ready = pointer.Int32(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
}) })
} }
@ -226,9 +272,26 @@ func TestParallelJobWithCompletions(t *testing.T) {
// number of pods. // number of pods.
t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10)) t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10))
t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10)) t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10))
for _, wFinalizers := range []bool{false, true} { cases := map[string]struct {
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) { trackWithFinalizers bool
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() enableReadyPods bool
}{
"none": {},
"with finalizers": {
trackWithFinalizers: true,
},
"ready pods": {
enableReadyPods: true,
},
"all": {
trackWithFinalizers: true,
enableReadyPods: true,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackWithFinalizers)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)()
closeFn, restConfig, clientSet, ns := setup(t, "completions") closeFn, restConfig, clientSet, ns := setup(t, "completions")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet) ctx, cancel := startJobController(restConfig, clientSet)
@ -243,38 +306,62 @@ func TestParallelJobWithCompletions(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed to create Job: %v", err) t.Fatalf("Failed to create Job: %v", err)
} }
if got := hasJobTrackingAnnotation(jobObj); got != wFinalizers { if got := hasJobTrackingAnnotation(jobObj); got != tc.trackWithFinalizers {
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers) t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, tc.trackWithFinalizers)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ want := podsByStatus{Active: 54}
Active: 54, if tc.enableReadyPods {
}, wFinalizers) want.Ready = pointer.Int32Ptr(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
// Tracks ready pods, if enabled.
if err := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
t.Fatalf("Failed Marking Pods as ready: %v", err)
}
if tc.enableReadyPods {
want.Ready = pointer.Int32(52)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
// Failed Pods are replaced. // Failed Pods are replaced.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ want = podsByStatus{
Active: 54, Active: 54,
Failed: 2, Failed: 2,
}, wFinalizers) }
if tc.enableReadyPods {
want.Ready = pointer.Int32(50)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
// Pods are created until the number of succeeded Pods equals completions. // Pods are created until the number of succeeded Pods equals completions.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ want = podsByStatus{
Failed: 2, Failed: 2,
Succeeded: 53, Succeeded: 53,
Active: 3, Active: 3,
}, wFinalizers) }
if tc.enableReadyPods {
want.Ready = pointer.Int32(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
// No more Pods are created after the Job completes. // No more Pods are created after the Job completes.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err) t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
} }
validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobSucceeded(ctx, t, clientSet, jobObj)
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ want = podsByStatus{
Failed: 2, Failed: 2,
Succeeded: 56, Succeeded: 56,
}, false) }
if tc.enableReadyPods {
want.Ready = pointer.Int32(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
}) })
} }
@ -707,6 +794,7 @@ func TestNodeSelectorUpdate(t *testing.T) {
type podsByStatus struct { type podsByStatus struct {
Active int Active int
Ready *int32
Failed int Failed int
Succeeded int Succeeded int
} }
@ -721,6 +809,7 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse
} }
actualCounts = podsByStatus{ actualCounts = podsByStatus{
Active: int(updatedJob.Status.Active), Active: int(updatedJob.Status.Active),
Ready: updatedJob.Status.Ready,
Succeeded: int(updatedJob.Status.Succeeded), Succeeded: int(updatedJob.Status.Succeeded),
Failed: int(updatedJob.Status.Failed), Failed: int(updatedJob.Status.Failed),
} }
@ -858,6 +947,28 @@ func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset
} }
func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) error { func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) error {
op := func(p *v1.Pod) bool {
p.Status.Phase = phase
return true
}
return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
}
func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) error {
op := func(p *v1.Pod) bool {
if podutil.IsPodReady(p) {
return false
}
p.Status.Conditions = append(p.Status.Conditions, v1.PodCondition{
Type: v1.PodReady,
Status: v1.ConditionTrue,
})
return true
}
return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
}
func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) error {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
return fmt.Errorf("listing Job Pods: %w", err) return fmt.Errorf("listing Job Pods: %w", err)
@ -868,7 +979,9 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj
break break
} }
if p := pod.Status.Phase; isPodOwnedByJob(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded { if p := pod.Status.Phase; isPodOwnedByJob(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded {
pod.Status.Phase = phase if !op(&pod) {
continue
}
updates = append(updates, pod) updates = append(updates, pod)
} }
} }