Fix tracking of terminating Pods when nothing else changes (#121342)

* cleanup: refactor pod replacement policy integration test into staged assertion

* cleanup: remove typo in job_test.go

* refactor PodReplacementPolicy test and remove test for defaulting the policy

* fix issue with missing update in job controller for terminating status and refactor pod replacement policy integration test

* use t.Cleanup instead of defer in PodReplacementPolicy integration tests

* revert t.Cleanup to defer for reseting feature flag in PodReplacementPolicy integration tests
This commit is contained in:
Dejan Zele Pejchev 2023-10-24 15:04:46 +02:00 committed by GitHub
parent 9aa04752e7
commit f8a4e343a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 210 additions and 102 deletions

View File

@ -914,10 +914,10 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
}
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(ready, job.Status.Ready)
needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, jobCtx.terminating)
job.Status.Active = active
job.Status.Ready = ready
job.Status.Terminating = jobCtx.terminating
needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, jobCtx.terminating)
err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
if err != nil {
return fmt.Errorf("tracking status: %w", err)

View File

@ -1690,81 +1690,102 @@ func TestIndexedJob(t *testing.T) {
}
func TestJobPodReplacementPolicy(t *testing.T) {
const podCount int32 = 2
indexedCompletion := batchv1.IndexedCompletion
nonIndexedCompletion := batchv1.NonIndexedCompletion
var podReplacementPolicy = func(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy {
return &obj
}
jobSpecIndexedDefault := &batchv1.JobSpec{
Parallelism: ptr.To(podCount),
Completions: ptr.To(podCount),
CompletionMode: &indexedCompletion,
type jobStatus struct {
active int
failed int
terminating *int32
}
cases := map[string]struct {
podReplacementPolicyEnabled bool
deletePods bool
failPods bool
wantTerminating *int32
wantFailed int
wantActive int
jobSpec *batchv1.JobSpec
wantStatusAfterDeletion jobStatus
wantStatusAfterFailure jobStatus
}{
"feature flag off, delete pods and verify no terminating status": {
deletePods: true,
jobSpec: jobSpecIndexedDefault,
wantActive: int(podCount),
wantFailed: int(podCount),
},
"feature flag true, delete pods and verify terminating status": {
podReplacementPolicyEnabled: true,
deletePods: true,
jobSpec: jobSpecIndexedDefault,
wantTerminating: ptr.To(podCount),
wantFailed: int(podCount),
},
"feature flag true, delete pods, verify terminating status and recreate upon terminating": {
podReplacementPolicyEnabled: true,
deletePods: true,
"feature flag off, delete & fail pods, recreate terminating pods, and verify job status counters": {
jobSpec: &batchv1.JobSpec{
Parallelism: ptr.To(podCount),
Completions: ptr.To(podCount),
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
CompletionMode: &indexedCompletion,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
},
},
},
wantStatusAfterDeletion: jobStatus{
active: 2,
failed: 2,
},
wantStatusAfterFailure: jobStatus{
active: 2,
failed: 2,
},
},
"feature flag true, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
podReplacementPolicyEnabled: true,
jobSpec: &batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
CompletionMode: &indexedCompletion,
PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
},
wantTerminating: ptr.To(podCount),
wantFailed: int(podCount),
},
"feature flag true, delete pods, verify terminating status and recreate once failed": {
},
wantStatusAfterDeletion: jobStatus{
active: 2,
failed: 2,
terminating: ptr.To[int32](2),
},
wantStatusAfterFailure: jobStatus{
active: 2,
failed: 2,
terminating: ptr.To[int32](0),
},
},
"feature flag true with NonIndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
podReplacementPolicyEnabled: true,
deletePods: true,
jobSpec: &batchv1.JobSpec{
Parallelism: ptr.To(podCount),
Completions: ptr.To(podCount),
CompletionMode: &nonIndexedCompletion,
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
CompletionMode: &indexedCompletion,
PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
},
wantTerminating: ptr.To(podCount),
},
"feature flag true with NonIndexedJob, delete pods, verify terminating status and recreate once failed": {
podReplacementPolicyEnabled: true,
deletePods: true,
jobSpec: &batchv1.JobSpec{
Parallelism: ptr.To(podCount),
Completions: ptr.To(podCount),
CompletionMode: &nonIndexedCompletion,
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
},
wantTerminating: ptr.To(podCount),
wantStatusAfterDeletion: jobStatus{
active: 2,
failed: 2,
terminating: ptr.To[int32](2),
},
"feature flag false, podFailurePolicy enabled, delete pods, verify terminating status and recreate once failed": {
wantStatusAfterFailure: jobStatus{
active: 2,
failed: 2,
terminating: ptr.To[int32](0),
},
},
"feature flag false, podFailurePolicy enabled, delete & fail pods, recreate failed pods, and verify job status counters": {
podReplacementPolicyEnabled: false,
deletePods: true,
jobSpec: &batchv1.JobSpec{
Parallelism: ptr.To(podCount),
Completions: ptr.To(podCount),
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
CompletionMode: &nonIndexedCompletion,
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
},
},
PodFailurePolicy: &batchv1.PodFailurePolicy{
Rules: []batchv1.PodFailurePolicyRule{
{
@ -1777,33 +1798,60 @@ func TestJobPodReplacementPolicy(t *testing.T) {
},
},
},
wantActive: int(podCount),
wantStatusAfterDeletion: jobStatus{
active: 2,
},
"feature flag true, recreate failed pods, and verify active and failed counters": {
wantStatusAfterFailure: jobStatus{
active: 2,
},
},
"feature flag true, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": {
podReplacementPolicyEnabled: true,
failPods: true,
jobSpec: &batchv1.JobSpec{
Parallelism: ptr.To(podCount),
Completions: ptr.To(podCount),
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
CompletionMode: &indexedCompletion,
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
},
wantActive: int(podCount),
wantFailed: int(podCount),
wantTerminating: ptr.To[int32](0),
},
"feature flag true with NonIndexedJob, recreate failed pods, and verify active and failed counters": {
},
wantStatusAfterDeletion: jobStatus{
active: 0,
failed: 0,
terminating: ptr.To[int32](2),
},
wantStatusAfterFailure: jobStatus{
active: 2,
failed: 2,
terminating: ptr.To[int32](0),
},
},
"feature flag true with NonIndexedJob, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": {
podReplacementPolicyEnabled: true,
failPods: true,
jobSpec: &batchv1.JobSpec{
Parallelism: ptr.To(podCount),
Completions: ptr.To(podCount),
Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2),
CompletionMode: &nonIndexedCompletion,
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
},
},
},
wantStatusAfterDeletion: jobStatus{
active: 0,
failed: 0,
terminating: ptr.To[int32](2),
},
wantStatusAfterFailure: jobStatus{
active: 2,
failed: 2,
terminating: ptr.To[int32](0),
},
wantActive: int(podCount),
wantFailed: int(podCount),
wantTerminating: ptr.To[int32](0),
},
}
for name, tc := range cases {
@ -1813,9 +1861,9 @@ func TestJobPodReplacementPolicy(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.jobSpec.PodFailurePolicy != nil)()
closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy")
defer closeFn()
t.Cleanup(closeFn)
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
defer cancel()
t.Cleanup(cancel)
resetMetrics()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
@ -1826,41 +1874,24 @@ func TestJobPodReplacementPolicy(t *testing.T) {
}
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
// Wait for pods to start up.
err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if job.Status.Active == podCount {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("Error waiting for Job pods to become active: %v", err)
}
if tc.deletePods {
err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx,
metav1.DeleteOptions{},
metav1.ListOptions{
Limit: 1000,
})
if err != nil {
t.Fatalf("Failed to delete Pods: %v", err)
}
}
if tc.failPods {
err, _ = setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, int(podCount))
if err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
}
waitForPodsToBeActive(ctx, t, jobClient, 2, jobObj)
t.Cleanup(func() { removePodsFinalizer(ctx, t, clientSet, ns.Name) })
deletePods(ctx, t, clientSet, ns.Name)
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
Terminating: tc.wantTerminating,
Failed: tc.wantFailed,
Active: tc.wantActive,
Terminating: tc.wantStatusAfterDeletion.terminating,
Failed: tc.wantStatusAfterDeletion.failed,
Active: tc.wantStatusAfterDeletion.active,
Ready: ptr.To[int32](0),
})
failTerminatingPods(ctx, t, clientSet, ns.Name)
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
Terminating: tc.wantStatusAfterFailure.terminating,
Failed: tc.wantStatusAfterFailure.failed,
Active: tc.wantStatusAfterFailure.active,
Ready: ptr.To[int32](0),
})
})
@ -3022,3 +3053,80 @@ func updateJob(ctx context.Context, jobClient typedv1.JobInterface, jobName stri
})
return job, err
}
func waitForPodsToBeActive(ctx context.Context, t *testing.T, jobClient typedv1.JobInterface, podCount int32, jobObj *batchv1.Job) {
t.Helper()
err := wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(context.Context) (done bool, err error) {
job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
return job.Status.Active == podCount, nil
})
if err != nil {
t.Fatalf("Error waiting for Job pods to become active: %v", err)
}
}
func deletePods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
t.Helper()
err := clientSet.CoreV1().Pods(namespace).DeleteCollection(ctx,
metav1.DeleteOptions{},
metav1.ListOptions{
Limit: 1000,
})
if err != nil {
t.Fatalf("Failed to cleanup Pods: %v", err)
}
}
func removePodsFinalizer(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
t.Helper()
pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed to list pods: %v", err)
}
updatePod(ctx, t, clientSet, pods.Items, func(pod *v1.Pod) {
for i, finalizer := range pod.Finalizers {
if finalizer == "fake.example.com/blockDeletion" {
pod.Finalizers = append(pod.Finalizers[:i], pod.Finalizers[i+1:]...)
}
}
})
}
func updatePod(ctx context.Context, t *testing.T, clientSet clientset.Interface, pods []v1.Pod, updateFunc func(*v1.Pod)) {
t.Helper()
for _, val := range pods {
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
newPod, err := clientSet.CoreV1().Pods(val.Namespace).Get(ctx, val.Name, metav1.GetOptions{})
if err != nil {
return err
}
updateFunc(newPod)
_, err = clientSet.CoreV1().Pods(val.Namespace).Update(ctx, newPod, metav1.UpdateOptions{})
return err
}); err != nil {
t.Fatalf("Failed to update pod %s: %v", val.Name, err)
}
}
}
func failTerminatingPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
t.Helper()
pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed to list pods: %v", err)
}
var terminatingPods []v1.Pod
for _, pod := range pods.Items {
if pod.DeletionTimestamp != nil {
pod.Status.Phase = v1.PodFailed
terminatingPods = append(terminatingPods, pod)
}
}
_, err = updatePodStatuses(ctx, clientSet, terminatingPods)
if err != nil {
t.Fatalf("Failed to update pod statuses: %v", err)
}
}