Make Job PodFailurePolicy e2e tests resilient to random failures to prepare them for conformance (#126169)

* Make JobPodFailurePolicy tests for ignore resilient to random failures

* Increase parallelism and evict in parallel

* Code review fixes to the job e2e tests
This commit is contained in:
Michał Woźniak 2024-09-17 19:00:19 +02:00 committed by GitHub
parent bfd91fbb3e
commit d6e5fb4a75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 104 additions and 154 deletions

View File

@ -39,6 +39,7 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch" watchtools "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/retry" "k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
batchinternal "k8s.io/kubernetes/pkg/apis/batch" batchinternal "k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2ejob "k8s.io/kubernetes/test/e2e/framework/job" e2ejob "k8s.io/kubernetes/test/e2e/framework/job"
@ -140,86 +141,39 @@ var _ = SIGDescribe("Job", func() {
framework.ExpectNoError(err, "failed to ensure job failure in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job failure in namespace: %s", f.Namespace.Name)
}) })
ginkgo.It("should allow to use the pod failure policy to not count the failure towards the backoffLimit", func(ctx context.Context) {
// We set the backoffLimit to 0 so that any pod failure would trigger
// job failure if not for the pod failure policy to ignore the failed
// pods from counting them towards the backoffLimit. Also, we fail the
// pod only once so that the job eventually succeeds.
// In order to ensure a Job's pod fails once before succeeding we force
// the Job's Pods to be scheduled to a single Node and use a hostPath
// volume to persist data across new Pods.
parallelism := int32(2)
completions := int32(4)
backoffLimit := int32(0)
ginkgo.By("Looking for a node to schedule job pod")
node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
framework.ExpectNoError(err)
ginkgo.By("Creating a job")
job := e2ejob.NewTestJobOnNode("failOnce", "pod-failure-ignore", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name)
job.Spec.PodFailurePolicy = &batchv1.PodFailurePolicy{
Rules: []batchv1.PodFailurePolicyRule{
{
Action: batchv1.PodFailurePolicyActionIgnore,
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{1},
},
},
},
}
job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Ensuring job reaches completions")
err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(batchv1.JobReasonCompletionsReached), completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
})
/* /*
Testname: Ensure pod failure policy allows to ignore failure for an evicted pod; matching on the exit code Testname: Ensure pod failure policy allows to ignore failure matching on the exit code
Description: This test is using an indexed job. The pod corresponding to the 0th index Description: This test is using an indexed job. The pod corresponding to each index
creates a marker file on the host and runs 'forever' until evicted. We use creates a marker file on the host and runs 'forever' until evicted. Once
the non-0-indexed pods to determine if the marker file is already the marker file is created the pod succeeds seeing it on restart. Thus,
created by the 0th indexed pod - the non-0-indexed pods fail and restart we trigger one failure per index due to eviction, so the Job would be
until the marker file is created (their potential failures are ignored marked as failed, if not for the ignore rule matching on exit codes.
based on the exit code). Once the marker file is created the 0th indexed
pod is evicted (DisruptionTarget condition is added in the process),
after restart it runs to successful completion.
Steps:
1. Select a node to run all Job's pods to ensure the host marker file is accessible by all pods
2. Create the indexed job with pod failure policy which ignores failed pods with 137 exit code
3. Await for all non-0-indexed pods to succeed to ensure the marker file is created by the 0-indexed pod
4. Make sure the 0-indexed pod is running
5. Evict the 0-indexed pod, the failure is ignored as it matches the pod failure policy
6. Await for the job to successfully complete
*/ */
ginkgo.It("should allow to use a pod failure policy to ignore failure for an evicted pod; matching on the exit code", func(ctx context.Context) { ginkgo.It("should allow to use a pod failure policy to ignore failure matching on exit code", func(ctx context.Context) {
// We set the backoffLimit to 0 so that any pod failure would trigger // We set the backoffLimit = numPods-1 so that we can tolerate random
// job failure if not for the pod failure policy to ignore the failed // failures (like OutOfPods from kubelet). Yet, the Job would fail if the
// pods from counting them towards the backoffLimit. // pod failures were not be ignored.
parallelism := int32(2) numPods := 3
completions := int32(4) parallelism := int32(numPods)
backoffLimit := int32(0) completions := int32(numPods)
backoffLimit := int32(numPods) - 1
ginkgo.By("Looking for a node to schedule job pods") ginkgo.By("Looking for a node to schedule job pods")
node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet) node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
framework.ExpectNoError(err) framework.ExpectNoError(err)
ginkgo.By("Creating a job") ginkgo.By("Creating a job")
job := e2ejob.NewTestJobOnNode("notTerminateOnce", "evicted-pod-ignore-on-exit-code", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name) job := e2ejob.NewTestJobOnNode("notTerminateOncePerIndex", "evicted-pod-ignore-on-exit-code", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name)
job.Spec.CompletionMode = ptr.To(batchv1.IndexedCompletion) job.Spec.CompletionMode = ptr.To(batchv1.IndexedCompletion)
job.Spec.PodFailurePolicy = &batchv1.PodFailurePolicy{ job.Spec.PodFailurePolicy = &batchv1.PodFailurePolicy{
Rules: []batchv1.PodFailurePolicyRule{ Rules: []batchv1.PodFailurePolicyRule{
{ {
// Ignore failures of the non 0-indexed pods which fail until the marker file is created // Ignore the pod failure caused by the eviction based on the
// And the 137 in the 0-indexed pod due to eviction. // exit code corresponding to SIGKILL.
Action: batchv1.PodFailurePolicyActionIgnore, Action: batchv1.PodFailurePolicyActionIgnore,
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{ OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn, Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{1, 137}, Values: []int32{137},
}, },
}, },
}, },
@ -227,82 +181,70 @@ var _ = SIGDescribe("Job", func() {
job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for all non 0-indexed pods to succeed to ensure the marker file is created") ginkgo.By("Waiting for all the pods to be ready")
err = e2ejob.WaitForJobPodsSucceeded(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions-1) err = e2ejob.WaitForJobReady(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(int32(numPods)))
framework.ExpectNoError(err, "failed to await for all non 0-indexed pods to succeed for job: %s/%s", job.Name, job.Namespace) framework.ExpectNoError(err, "failed to await for all pods to be ready for job: %s/%s", job.Name, job.Namespace)
ginkgo.By("Awaiting for the 0-indexed pod to be running")
err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, 1)
framework.ExpectNoError(err, "failed to await for the 0-indexed pod to be running for the job: %s/%s", job.Name, job.Namespace)
ginkgo.By("Fetch all running pods")
pods, err := e2ejob.GetAllRunningJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name) pods, err := e2ejob.GetAllRunningJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to get running pods for the job: %s/%s", job.Name, job.Namespace) framework.ExpectNoError(err, "failed to get running pods for the job: %s/%s", job.Name, job.Namespace)
gomega.Expect(pods).To(gomega.HaveLen(1), "Exactly one running pod is expected") gomega.Expect(pods).To(gomega.HaveLen(numPods), "Number of running pods doesn't match parallelism")
pod := pods[0]
ginkgo.By(fmt.Sprintf("Evicting the running pod: %s/%s", pod.Name, pod.Namespace))
evictTarget := &policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
}
err = f.ClientSet.CoreV1().Pods(pod.Namespace).EvictV1(ctx, evictTarget)
framework.ExpectNoError(err, "failed to evict the pod: %s/%s", pod.Name, pod.Namespace)
ginkgo.By(fmt.Sprintf("Awaiting for the pod: %s/%s to be deleted", pod.Name, pod.Namespace)) ginkgo.By("Evict all the Pods")
err = e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete) workqueue.ParallelizeUntil(ctx, numPods, numPods, func(index int) {
framework.ExpectNoError(err, "failed to await for the pod to be deleted: %s/%s", pod.Name, pod.Namespace) defer ginkgo.GinkgoRecover()
pod := pods[index]
ginkgo.By(fmt.Sprintf("Evicting the running pod: %s/%s", pod.Name, pod.Namespace))
evictTarget := &policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
}
err = f.ClientSet.CoreV1().Pods(pod.Namespace).EvictV1(ctx, evictTarget)
framework.ExpectNoError(err, "failed to evict the pod: %s/%s", pod.Name, pod.Namespace)
ginkgo.By(fmt.Sprintf("Awaiting for the pod: %s/%s to be deleted", pod.Name, pod.Namespace))
err = e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)
framework.ExpectNoError(err, "failed to await for all pods to be deleted: %s/%s", pod.Name, pod.Namespace)
})
ginkgo.By("Ensuring job reaches completions") ginkgo.By("Ensuring job reaches completions")
err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(batchv1.JobReasonCompletionsReached), completions) err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, nil, completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
}) })
/* /*
Testname: Ensure pod failure policy allows to ignore failure for an evicted pod; matching on the DisruptionTarget condition Testname: Ensure pod failure policy allows to ignore failure matching on the DisruptionTarget condition
Description: This test is using an indexed job. The pod corresponding to the 0th index Description: This test is using an indexed job. The pod corresponding to each index
creates a marker file on the host and runs 'forever' until evicted. We use creates a marker file on the host and runs 'forever' until evicted. Once
the non-0-indexed pods to determine if the marker file is already the marker file is created the pod succeeds seeing it on restart. Thus,
created by the 0th indexed pod - the non-0-indexed pods fail and restart we trigger one failure per index due to eviction (DisruptionTarget
until the marker file is created (their potential failures are ignored condition is added in the process). The Job would be marked as failed,
based on the exit code). Once the marker file is created the 0th indexed if not for the ignore rule matching on exit codes.
pod is evicted (DisruptionTarget condition is added in the process),
after restart it runs to successful completion.
Steps:
1. Select a node to run all Job's pods to ensure the host marker file is accessible by all pods
2. Create the indexed job with pod failure policy which ignores failed pods with DisruptionTarget condition
3. Await for all non-0-indexed pods to succeed to ensure the marker file is created by the 0-indexed pod
4. Make sure the 0-indexed pod is running
5. Evict the 0-indexed pod, the failure is ignored as it matches the pod failure policy
6. Await for the job to successfully complete
*/ */
ginkgo.It("should allow to use a pod failure policy to ignore failure for an evicted pod; matching on the DisruptionTarget condition", func(ctx context.Context) { ginkgo.It("should allow to use a pod failure policy to ignore failure matching on DisruptionTarget condition", func(ctx context.Context) {
// We set the backoffLimit to 0 so that any pod failure would trigger // We set the backoffLimit = numPods-1 so that we can tolerate random
// job failure if not for the pod failure policy to ignore the failed // failures (like OutOfPods from kubelet). Yet, the Job would fail if the
// pods from counting them towards the backoffLimit. // pod failures were not be ignored.
parallelism := int32(2) numPods := 3
completions := int32(4) parallelism := int32(numPods)
backoffLimit := int32(0) completions := int32(numPods)
backoffLimit := int32(numPods) - 1
ginkgo.By("Looking for a node to schedule job pods") ginkgo.By("Looking for a node to schedule job pods")
node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet) node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
framework.ExpectNoError(err) framework.ExpectNoError(err)
ginkgo.By("Creating a job") ginkgo.By("Creating a job")
job := e2ejob.NewTestJobOnNode("notTerminateOnce", "evicted-pod-ignore-on-disruption-condition", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name) job := e2ejob.NewTestJobOnNode("notTerminateOncePerIndex", "evicted-pod-ignore-on-disruption-condition", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name)
job.Spec.CompletionMode = ptr.To(batchv1.IndexedCompletion) job.Spec.CompletionMode = ptr.To(batchv1.IndexedCompletion)
job.Spec.PodFailurePolicy = &batchv1.PodFailurePolicy{ job.Spec.PodFailurePolicy = &batchv1.PodFailurePolicy{
Rules: []batchv1.PodFailurePolicyRule{ Rules: []batchv1.PodFailurePolicyRule{
{ {
// Ignore failures of the non 0-indexed pods which fail until the marker file is created // Ignore the pod failure caused by the eviction based on the
Action: batchv1.PodFailurePolicyActionIgnore, // DisruptionTarget condition
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{1},
},
},
{
// Ignore the pod failure caused by the eviction
Action: batchv1.PodFailurePolicyActionIgnore, Action: batchv1.PodFailurePolicyActionIgnore,
OnPodConditions: []batchv1.PodFailurePolicyOnPodConditionsPattern{ OnPodConditions: []batchv1.PodFailurePolicyOnPodConditionsPattern{
{ {
@ -316,34 +258,37 @@ var _ = SIGDescribe("Job", func() {
job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for all non 0-indexed pods to succeed to ensure the marker file is created") ginkgo.By("Waiting for all the pods to be ready")
err = e2ejob.WaitForJobPodsSucceeded(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions-1) err = e2ejob.WaitForJobReady(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(int32(numPods)))
framework.ExpectNoError(err, "failed to await for all non 0-indexed pods to succeed for job: %s/%s", job.Name, job.Namespace) framework.ExpectNoError(err, "failed to await for all pods to be ready for job: %s/%s", job.Name, job.Namespace)
ginkgo.By("Awaiting for the 0-indexed pod to be running")
err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, 1)
framework.ExpectNoError(err, "failed to await for the 0-indexed pod to be running for the job: %s/%s", job.Name, job.Namespace)
ginkgo.By("Fetch all running pods")
pods, err := e2ejob.GetAllRunningJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name) pods, err := e2ejob.GetAllRunningJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to get running pods for the job: %s/%s", job.Name, job.Namespace) framework.ExpectNoError(err, "failed to get running pods for the job: %s/%s", job.Name, job.Namespace)
gomega.Expect(pods).To(gomega.HaveLen(1), "Exactly one running pod is expected") gomega.Expect(pods).To(gomega.HaveLen(numPods), "Number of running pods doesn't match parallelism")
pod := pods[0]
ginkgo.By(fmt.Sprintf("Evicting the running pod: %s/%s", pod.Name, pod.Namespace))
evictTarget := &policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
}
err = f.ClientSet.CoreV1().Pods(pod.Namespace).EvictV1(ctx, evictTarget)
framework.ExpectNoError(err, "failed to evict the pod: %s/%s", pod.Name, pod.Namespace)
ginkgo.By(fmt.Sprintf("Awaiting for the pod: %s/%s to be deleted", pod.Name, pod.Namespace)) ginkgo.By("Evict all the Pods")
err = e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete) workqueue.ParallelizeUntil(ctx, numPods, numPods, func(index int) {
framework.ExpectNoError(err, "failed to await for the pod to be deleted: %s/%s", pod.Name, pod.Namespace) defer ginkgo.GinkgoRecover()
pod := pods[index]
ginkgo.By(fmt.Sprintf("Evicting the running pod: %s/%s", pod.Name, pod.Namespace))
evictTarget := &policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
}
err = f.ClientSet.CoreV1().Pods(pod.Namespace).EvictV1(ctx, evictTarget)
framework.ExpectNoError(err, "failed to evict the pod: %s/%s", pod.Name, pod.Namespace)
ginkgo.By(fmt.Sprintf("Awaiting for the pod: %s/%s to be deleted", pod.Name, pod.Namespace))
err = e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)
framework.ExpectNoError(err, "failed to await for all pods to be deleted: %s/%s", pod.Name, pod.Namespace)
})
ginkgo.By("Ensuring job reaches completions") ginkgo.By("Ensuring job reaches completions")
err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To(batchv1.JobReasonCompletionsReached), completions) err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, nil, completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
}) })

View File

@ -136,27 +136,32 @@ func NewTestJobOnNode(behavior, name string, rPol v1.RestartPolicy, parallelism,
exit 1 exit 1
fi fi
`} `}
case "notTerminateOnce": case "notTerminateOncePerIndex":
// Do not terminate the 0-indexed pod in the first run and // Use marker files per index. If the given marker file already exists
// succeed the second time. Fail the non-0-indexed pods until // then terminate successfully. Otherwise create the marker file and
// the marker file is created by the 0-indexed pod. The fact that // sleep "forever" awaiting delete request.
// the non-0-indexed pods are succeeded is used to determine that the
// 0th indexed pod already created the marker file.
setupHostPathDirectory(job) setupHostPathDirectory(job)
job.Spec.Template.Spec.TerminationGracePeriodSeconds = ptr.To(int64(1)) job.Spec.Template.Spec.TerminationGracePeriodSeconds = ptr.To(int64(1))
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c"} job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c"}
job.Spec.Template.Spec.Containers[0].Args = []string{` job.Spec.Template.Spec.Containers[0].Args = []string{`
if [[ -r /data/foo ]] if [[ -r /data/foo-$JOB_COMPLETION_INDEX ]]
then then
exit 0 exit 0
elif [[ $JOB_COMPLETION_INDEX -eq 0 ]]
then
touch /data/foo
sleep 1000000
else else
exit 1 touch /data/foo-$JOB_COMPLETION_INDEX
sleep 1000000
fi fi
`} `}
// Add readiness probe to allow the test client to check if the marker
// file is already created before evicting the Pod.
job.Spec.Template.Spec.Containers[0].ReadinessProbe = &v1.Probe{
PeriodSeconds: 1,
ProbeHandler: v1.ProbeHandler{
Exec: &v1.ExecAction{
Command: []string{"/bin/sh", "-c", "cat /data/foo-$JOB_COMPLETION_INDEX"},
},
},
}
} }
return job return job
} }