From f2b723a130eee6fe6a1d91fdd39f993582e794ab Mon Sep 17 00:00:00 2001 From: Dejan Pejchev Date: Mon, 16 Oct 2023 21:14:09 +0200 Subject: [PATCH 1/6] fix: improve assertion for Failed PodReplacementPolicy integration test cases --- test/integration/job/job_test.go | 71 ++++++++++++++++++++------------ 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 49338637aeb..742df86e0a1 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1681,6 +1681,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { } cases := map[string]struct { podReplacementPolicyEnabled bool + failPodsInsteadOfDeletion bool wantTerminating *int32 wantFailed int wantActive int @@ -1716,7 +1717,10 @@ func TestJobPodReplacementPolicy(t *testing.T) { CompletionMode: &nonIndexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), }, - wantTerminating: pointer.Int32(podCount), + failPodsInsteadOfDeletion: true, + wantActive: int(podCount), + wantFailed: int(podCount), + wantTerminating: pointer.Int32(0), }, "feature flag true with NonIndexedJob, delete pods, verify terminating status and recreate once failed": { podReplacementPolicyEnabled: true, @@ -1726,7 +1730,10 @@ func TestJobPodReplacementPolicy(t *testing.T) { CompletionMode: &nonIndexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), }, - wantTerminating: pointer.Int32(podCount), + failPodsInsteadOfDeletion: true, + wantActive: int(podCount), + wantFailed: int(podCount), + wantTerminating: pointer.Int32(0), }, "feature flag false, podFailurePolicy enabled, delete pods, verify terminating status and recreate once failed": { podReplacementPolicyEnabled: false, @@ -1776,7 +1783,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { if err != nil { return false, err } - if job.Status.Active == int32(podCount) { + if job.Status.Active == podCount { return true, nil } return false, nil @@ -1784,30 +1791,13 @@ func TestJobPodReplacementPolicy(t *testing.T) { if err != nil { t.Fatalf("Error waiting for Job pods to become active: %v", err) } - pods, errList := clientSet.CoreV1().Pods(ns.Namespace).List(ctx, metav1.ListOptions{}) - if errList != nil { - t.Fatalf("Failed to list pods: %v", errList) - } - updatePod(t, clientSet, pods.Items, func(pod *v1.Pod) { - pod.Finalizers = append(pod.Finalizers, "fake.example.com/blockDeletion") - }) - err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx, - metav1.DeleteOptions{}, - metav1.ListOptions{ - Limit: 1000, - }) - if err != nil { - t.Fatalf("Failed to cleanup Pods: %v", err) - } - - podsDelete, errList2 := clientSet.CoreV1().Pods(ns.Namespace).List(ctx, metav1.ListOptions{}) - if errList != nil { - t.Fatalf("Failed to list pods: %v", errList2) - } - for _, val := range podsDelete.Items { - if val.DeletionTimestamp == nil { - t.Fatalf("Deletion not registered.") + if tc.failPodsInsteadOfDeletion { + err, _ = setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, int(podCount)) + if err != nil { + t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } + } else { + addFinalizerAndDeletePods(ctx, t, clientSet, ns.Name) } validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{ @@ -2985,3 +2975,32 @@ func updateJob(ctx context.Context, jobClient typedv1.JobInterface, jobName stri }) return job, err } + +func addFinalizerAndDeletePods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) { + t.Helper() + pods, errList := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if errList != nil { + t.Fatalf("Failed to list pods: %v", errList) + } + updatePod(t, clientSet, pods.Items, func(pod *v1.Pod) { + pod.Finalizers = append(pod.Finalizers, "fake.example.com/blockDeletion") + }) + err := clientSet.CoreV1().Pods(namespace).DeleteCollection(ctx, + metav1.DeleteOptions{}, + metav1.ListOptions{ + Limit: 1000, + }) + if err != nil { + t.Fatalf("Failed to cleanup Pods: %v", err) + } + + podsDelete, errList2 := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if errList2 != nil { + t.Fatalf("Failed to list pods: %v", errList2) + } + for _, val := range podsDelete.Items { + if val.DeletionTimestamp == nil { + t.Fatalf("Deletion not registered.") + } + } +} From bcf1c113f4e3e6123cfe9078d4aef2fea8e796b2 Mon Sep 17 00:00:00 2001 From: Dejan Pejchev Date: Tue, 17 Oct 2023 18:04:25 +0200 Subject: [PATCH 2/6] cleanup: add new test cases for failed pod replacement policy instead of editing existing ones --- test/integration/job/job_test.go | 52 ++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 742df86e0a1..c3bc18ccc2c 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1681,25 +1681,29 @@ func TestJobPodReplacementPolicy(t *testing.T) { } cases := map[string]struct { podReplacementPolicyEnabled bool - failPodsInsteadOfDeletion bool + deletePods bool + failPods bool wantTerminating *int32 wantFailed int wantActive int jobSpec *batchv1.JobSpec }{ "feature flag off, delete pods and verify no terminating status": { + deletePods: true, jobSpec: jobSpecIndexedDefault, wantActive: int(podCount), wantFailed: int(podCount), }, "feature flag true, delete pods and verify terminating status": { podReplacementPolicyEnabled: true, + deletePods: true, jobSpec: jobSpecIndexedDefault, wantTerminating: pointer.Int32(podCount), wantFailed: int(podCount), }, "feature flag true, delete pods, verify terminating status and recreate upon terminating": { podReplacementPolicyEnabled: true, + deletePods: true, jobSpec: &batchv1.JobSpec{ Parallelism: pointer.Int32Ptr(podCount), Completions: pointer.Int32Ptr(podCount), @@ -1711,32 +1715,29 @@ func TestJobPodReplacementPolicy(t *testing.T) { }, "feature flag true, delete pods, verify terminating status and recreate once failed": { podReplacementPolicyEnabled: true, + deletePods: true, jobSpec: &batchv1.JobSpec{ Parallelism: pointer.Int32Ptr(podCount), Completions: pointer.Int32Ptr(podCount), CompletionMode: &nonIndexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), }, - failPodsInsteadOfDeletion: true, - wantActive: int(podCount), - wantFailed: int(podCount), - wantTerminating: pointer.Int32(0), + wantTerminating: pointer.Int32(podCount), }, "feature flag true with NonIndexedJob, delete pods, verify terminating status and recreate once failed": { podReplacementPolicyEnabled: true, + deletePods: true, jobSpec: &batchv1.JobSpec{ Parallelism: pointer.Int32Ptr(podCount), Completions: pointer.Int32Ptr(podCount), CompletionMode: &nonIndexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), }, - failPodsInsteadOfDeletion: true, - wantActive: int(podCount), - wantFailed: int(podCount), - wantTerminating: pointer.Int32(0), + wantTerminating: pointer.Int32(podCount), }, "feature flag false, podFailurePolicy enabled, delete pods, verify terminating status and recreate once failed": { podReplacementPolicyEnabled: false, + deletePods: true, jobSpec: &batchv1.JobSpec{ Parallelism: pointer.Int32Ptr(podCount), Completions: pointer.Int32Ptr(podCount), @@ -1756,6 +1757,32 @@ func TestJobPodReplacementPolicy(t *testing.T) { }, wantActive: int(podCount), }, + "feature flag true, delete pods, verify active and failed status and recreate once failed": { + podReplacementPolicyEnabled: true, + failPods: true, + jobSpec: &batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(podCount), + Completions: pointer.Int32Ptr(podCount), + CompletionMode: &nonIndexedCompletion, + PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), + }, + wantActive: int(podCount), + wantFailed: int(podCount), + wantTerminating: pointer.Int32(0), + }, + "feature flag true with NonIndexedJob, delete pods, verify active and failed status and recreate once failed": { + podReplacementPolicyEnabled: true, + failPods: true, + jobSpec: &batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(podCount), + Completions: pointer.Int32Ptr(podCount), + CompletionMode: &nonIndexedCompletion, + PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), + }, + wantActive: int(podCount), + wantFailed: int(podCount), + wantTerminating: pointer.Int32(0), + }, } for name, tc := range cases { tc := tc @@ -1791,13 +1818,14 @@ func TestJobPodReplacementPolicy(t *testing.T) { if err != nil { t.Fatalf("Error waiting for Job pods to become active: %v", err) } - if tc.failPodsInsteadOfDeletion { + if tc.deletePods { + addFinalizerAndDeletePods(ctx, t, clientSet, ns.Name) + } + if tc.failPods { err, _ = setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, int(podCount)) if err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } - } else { - addFinalizerAndDeletePods(ctx, t, clientSet, ns.Name) } validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{ From e73edf7764fc365e91549a123959c26be1678e91 Mon Sep 17 00:00:00 2001 From: Dejan Pejchev Date: Tue, 17 Oct 2023 18:15:55 +0200 Subject: [PATCH 3/6] fix: typo in indexed & non-indexed completion policies for failed pod replacement policy integration tests --- test/integration/job/job_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index c3bc18ccc2c..4638e4e5778 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1719,7 +1719,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { jobSpec: &batchv1.JobSpec{ Parallelism: pointer.Int32Ptr(podCount), Completions: pointer.Int32Ptr(podCount), - CompletionMode: &nonIndexedCompletion, + CompletionMode: &indexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), }, wantTerminating: pointer.Int32(podCount), @@ -1763,7 +1763,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { jobSpec: &batchv1.JobSpec{ Parallelism: pointer.Int32Ptr(podCount), Completions: pointer.Int32Ptr(podCount), - CompletionMode: &nonIndexedCompletion, + CompletionMode: &indexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), }, wantActive: int(podCount), From 056b25dfcabbcdb2d942837828849ab797bfa1d7 Mon Sep 17 00:00:00 2001 From: Dejan Pejchev Date: Tue, 17 Oct 2023 19:18:58 +0200 Subject: [PATCH 4/6] fix: improve PodReplacementPolicy integration test case names and update deprecated methods --- test/integration/job/job_test.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 4638e4e5778..8fc67985f4b 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "k8s.io/utils/ptr" "sort" "strconv" "strings" @@ -1757,31 +1758,31 @@ func TestJobPodReplacementPolicy(t *testing.T) { }, wantActive: int(podCount), }, - "feature flag true, delete pods, verify active and failed status and recreate once failed": { + "feature flag true, recreate failed pods, and verify active and failed counters": { podReplacementPolicyEnabled: true, failPods: true, jobSpec: &batchv1.JobSpec{ - Parallelism: pointer.Int32Ptr(podCount), - Completions: pointer.Int32Ptr(podCount), + Parallelism: ptr.To(podCount), + Completions: ptr.To(podCount), CompletionMode: &indexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), }, wantActive: int(podCount), wantFailed: int(podCount), - wantTerminating: pointer.Int32(0), + wantTerminating: ptr.To[int32](0), }, - "feature flag true with NonIndexedJob, delete pods, verify active and failed status and recreate once failed": { + "feature flag true with NonIndexedJob, recreate failed pods, and verify active and failed counters": { podReplacementPolicyEnabled: true, failPods: true, jobSpec: &batchv1.JobSpec{ - Parallelism: pointer.Int32Ptr(podCount), - Completions: pointer.Int32Ptr(podCount), + Parallelism: ptr.To(podCount), + Completions: ptr.To(podCount), CompletionMode: &nonIndexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.Failed), }, wantActive: int(podCount), wantFailed: int(podCount), - wantTerminating: pointer.Int32(0), + wantTerminating: ptr.To[int32](0), }, } for name, tc := range cases { From 2ccf7e8e49b15e1e9534b4cb3ddbe21459f2fe2c Mon Sep 17 00:00:00 2001 From: Dejan Pejchev Date: Tue, 17 Oct 2023 20:07:20 +0200 Subject: [PATCH 5/6] fix: minor lint issues and redundant check --- test/integration/job/job_test.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 8fc67985f4b..0fa8aaa3145 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "k8s.io/utils/ptr" "sort" "strconv" "strings" @@ -59,6 +58,7 @@ import ( "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/util" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" ) const waitInterval = time.Second @@ -3007,14 +3007,14 @@ func updateJob(ctx context.Context, jobClient typedv1.JobInterface, jobName stri func addFinalizerAndDeletePods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) { t.Helper() - pods, errList := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) - if errList != nil { - t.Fatalf("Failed to list pods: %v", errList) + pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list pods: %v", err) } updatePod(t, clientSet, pods.Items, func(pod *v1.Pod) { pod.Finalizers = append(pod.Finalizers, "fake.example.com/blockDeletion") }) - err := clientSet.CoreV1().Pods(namespace).DeleteCollection(ctx, + err = clientSet.CoreV1().Pods(namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ Limit: 1000, @@ -3022,14 +3022,8 @@ func addFinalizerAndDeletePods(ctx context.Context, t *testing.T, clientSet clie if err != nil { t.Fatalf("Failed to cleanup Pods: %v", err) } - - podsDelete, errList2 := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) - if errList2 != nil { - t.Fatalf("Failed to list pods: %v", errList2) - } - for _, val := range podsDelete.Items { - if val.DeletionTimestamp == nil { - t.Fatalf("Deletion not registered.") - } + _, err = clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list pods: %v", err) } } From fad4430f9e67e6b9e92af26295363669f15f98c4 Mon Sep 17 00:00:00 2001 From: Dejan Pejchev Date: Tue, 17 Oct 2023 20:30:54 +0200 Subject: [PATCH 6/6] cleanup: remove redundant logic in PodReplacementPolicy integration tests --- test/integration/job/job_test.go | 48 ++++++-------------------------- 1 file changed, 8 insertions(+), 40 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 0fa8aaa3145..19ce119d6b8 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1820,7 +1820,14 @@ func TestJobPodReplacementPolicy(t *testing.T) { t.Fatalf("Error waiting for Job pods to become active: %v", err) } if tc.deletePods { - addFinalizerAndDeletePods(ctx, t, clientSet, ns.Name) + err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx, + metav1.DeleteOptions{}, + metav1.ListOptions{ + Limit: 1000, + }) + if err != nil { + t.Fatalf("Failed to delete Pods: %v", err) + } } if tc.failPods { err, _ = setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, int(podCount)) @@ -2805,22 +2812,6 @@ func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updat return int(updated), nil } -func updatePod(t *testing.T, clientSet clientset.Interface, pods []v1.Pod, updateFunc func(*v1.Pod)) { - for _, val := range pods { - if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - newPod, err := clientSet.CoreV1().Pods(val.Namespace).Get(context.TODO(), val.Name, metav1.GetOptions{}) - if err != nil { - return err - } - updateFunc(newPod) - _, err = clientSet.CoreV1().Pods(val.Namespace).Update(context.TODO(), newPod, metav1.UpdateOptions{}) - return err - }); err != nil { - t.Fatalf("Failed to update pod %s: %v", val.Name, err) - } - } -} - func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error { pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { @@ -3004,26 +2995,3 @@ func updateJob(ctx context.Context, jobClient typedv1.JobInterface, jobName stri }) return job, err } - -func addFinalizerAndDeletePods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) { - t.Helper() - pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) - if err != nil { - t.Fatalf("Failed to list pods: %v", err) - } - updatePod(t, clientSet, pods.Items, func(pod *v1.Pod) { - pod.Finalizers = append(pod.Finalizers, "fake.example.com/blockDeletion") - }) - err = clientSet.CoreV1().Pods(namespace).DeleteCollection(ctx, - metav1.DeleteOptions{}, - metav1.ListOptions{ - Limit: 1000, - }) - if err != nil { - t.Fatalf("Failed to cleanup Pods: %v", err) - } - _, err = clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) - if err != nil { - t.Fatalf("Failed to list pods: %v", err) - } -}