diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 4046a7e055c..8e98fd6b5f8 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -58,6 +58,7 @@ import ( "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/util" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" ) const waitInterval = time.Second @@ -1681,24 +1682,29 @@ func TestJobPodReplacementPolicy(t *testing.T) { } cases := map[string]struct { podReplacementPolicyEnabled bool + deletePods bool + failPods bool wantTerminating *int32 wantFailed int wantActive int jobSpec *batchv1.JobSpec }{ "feature flag off, delete pods and verify no terminating status": { + deletePods: true, jobSpec: jobSpecIndexedDefault, wantActive: int(podCount), wantFailed: int(podCount), }, "feature flag true, delete pods and verify terminating status": { podReplacementPolicyEnabled: true, + deletePods: true, jobSpec: jobSpecIndexedDefault, wantTerminating: pointer.Int32(podCount), wantFailed: int(podCount), }, "feature flag true, delete pods, verify terminating status and recreate upon terminating": { podReplacementPolicyEnabled: true, + deletePods: true, jobSpec: &batchv1.JobSpec{ Parallelism: pointer.Int32Ptr(podCount), Completions: pointer.Int32Ptr(podCount), @@ -1710,16 +1716,18 @@ func TestJobPodReplacementPolicy(t *testing.T) { }, "feature flag true, delete pods, verify terminating status and recreate once failed": { podReplacementPolicyEnabled: true, + deletePods: true, jobSpec: &batchv1.JobSpec{ Parallelism: pointer.Int32Ptr(podCount), Completions: pointer.Int32Ptr(podCount), - CompletionMode: &nonIndexedCompletion, + CompletionMode: &indexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), }, wantTerminating: pointer.Int32(podCount), }, "feature flag true with NonIndexedJob, delete pods, verify terminating status and recreate once failed": { podReplacementPolicyEnabled: true, + deletePods: true, jobSpec: &batchv1.JobSpec{ Parallelism: pointer.Int32Ptr(podCount), Completions: pointer.Int32Ptr(podCount), @@ -1730,6 +1738,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { }, "feature flag false, podFailurePolicy enabled, delete pods, verify terminating status and recreate once failed": { podReplacementPolicyEnabled: false, + deletePods: true, jobSpec: &batchv1.JobSpec{ Parallelism: pointer.Int32Ptr(podCount), Completions: pointer.Int32Ptr(podCount), @@ -1749,6 +1758,32 @@ func TestJobPodReplacementPolicy(t *testing.T) { }, wantActive: int(podCount), }, + "feature flag true, recreate failed pods, and verify active and failed counters": { + podReplacementPolicyEnabled: true, + failPods: true, + jobSpec: &batchv1.JobSpec{ + Parallelism: ptr.To(podCount), + Completions: ptr.To(podCount), + CompletionMode: &indexedCompletion, + PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), + }, + wantActive: int(podCount), + wantFailed: int(podCount), + wantTerminating: ptr.To[int32](0), + }, + "feature flag true with NonIndexedJob, recreate failed pods, and verify active and failed counters": { + podReplacementPolicyEnabled: true, + failPods: true, + jobSpec: &batchv1.JobSpec{ + Parallelism: ptr.To(podCount), + Completions: ptr.To(podCount), + CompletionMode: &nonIndexedCompletion, + PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), + }, + wantActive: int(podCount), + wantFailed: int(podCount), + wantTerminating: ptr.To[int32](0), + }, } for name, tc := range cases { tc := tc @@ -1776,7 +1811,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { if err != nil { return false, err } - if job.Status.Active == int32(podCount) { + if job.Status.Active == podCount { return true, nil } return false, nil @@ -1784,29 +1819,20 @@ func TestJobPodReplacementPolicy(t *testing.T) { if err != nil { t.Fatalf("Error waiting for Job pods to become active: %v", err) } - pods, errList := clientSet.CoreV1().Pods(ns.Namespace).List(ctx, metav1.ListOptions{}) - if errList != nil { - t.Fatalf("Failed to list pods: %v", errList) + if tc.deletePods { + err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx, + metav1.DeleteOptions{}, + metav1.ListOptions{ + Limit: 1000, + }) + if err != nil { + t.Fatalf("Failed to delete Pods: %v", err) + } } - updatePod(t, clientSet, pods.Items, func(pod *v1.Pod) { - pod.Finalizers = append(pod.Finalizers, "fake.example.com/blockDeletion") - }) - err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx, - metav1.DeleteOptions{}, - metav1.ListOptions{ - Limit: 1000, - }) - if err != nil { - t.Fatalf("Failed to cleanup Pods: %v", err) - } - - podsDelete, errList2 := clientSet.CoreV1().Pods(ns.Namespace).List(ctx, metav1.ListOptions{}) - if errList != nil { - t.Fatalf("Failed to list pods: %v", errList2) - } - for _, val := range podsDelete.Items { - if val.DeletionTimestamp == nil { - t.Fatalf("Deletion not registered.") + if tc.failPods { + err, _ = setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, int(podCount)) + if err != nil { + t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } } @@ -2786,22 +2812,6 @@ func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updat return int(updated), nil } -func updatePod(t *testing.T, clientSet clientset.Interface, pods []v1.Pod, updateFunc func(*v1.Pod)) { - for _, val := range pods { - if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - newPod, err := clientSet.CoreV1().Pods(val.Namespace).Get(context.TODO(), val.Name, metav1.GetOptions{}) - if err != nil { - return err - } - updateFunc(newPod) - _, err = clientSet.CoreV1().Pods(val.Namespace).Update(context.TODO(), newPod, metav1.UpdateOptions{}) - return err - }); err != nil { - t.Fatalf("Failed to update pod %s: %v", val.Name, err) - } - } -} - func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error { pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) if err != nil {