From 41285a7c91c395f585263c0f135b2f0713251a1b Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Wed, 9 Nov 2022 16:44:52 +0100 Subject: [PATCH] Add e2e test for job pod failure policy used to match pod disruption --- test/e2e/apps/job.go | 97 ++++++++++++++++++++++++++++-- test/e2e/framework/job/fixtures.go | 33 +++++++--- test/e2e/framework/job/rest.go | 15 +++++ test/e2e/framework/job/wait.go | 18 ++++-- test/e2e/scheduling/nvidia-gpus.go | 2 +- test/e2e/upgrades/apps/job.go | 2 +- 6 files changed, 146 insertions(+), 21 deletions(-) diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index dc445a4a982..83fdf70ee2e 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -25,6 +25,7 @@ import ( batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -169,6 +170,92 @@ var _ = SIGDescribe("Job", func() { framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) }) + // This test is using an indexed job. The pod corresponding to the 0th index + // creates a marker file on the host and runs 'forever' until evicted. We use + // the non-0-indexed pods to determine if the marker file is already + // created by the 0th indexed pod - the non-0-indexed pods fail and restart + // until the marker file is created (their potential failures are ignored + // based on the exit code). Once the marker file is created the 0th indexed + // pod is evicted (DisruptionTarget condition is added in the process), + // after restart it runs to successful completion. + // Steps: + // 1. Select a node to run all Job's pods to ensure the host marker file is accessible by all pods + // 2. Create the indexed job + // 3. Await for all non-0-indexed pods to succeed to ensure the marker file is created by the 0-indexed pod + // 4. Make sure the 0-indexed pod is running + // 5. Evict the 0-indexed pod + // 6. Await for the job to successfully complete + ginkgo.It("should allow to use the pod failure policy to not count pod disruption towards the backoffLimit", func() { + mode := batchv1.IndexedCompletion + + // We set the backoffLimit to 0 so that any pod failure would trigger + // job failure if not for the pod failure policy to ignore the failed + // pods from counting them towards the backoffLimit. + backoffLimit := int32(0) + + ginkgo.By("Looking for a node to schedule job pods") + node, err := e2enode.GetRandomReadySchedulableNode(f.ClientSet) + framework.ExpectNoError(err) + + ginkgo.By("Creating a job") + job := e2ejob.NewTestJobOnNode("notTerminateOnce", "pod-disruption-failure-ignore", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name) + job.Spec.CompletionMode = &mode + job.Spec.PodFailurePolicy = &batchv1.PodFailurePolicy{ + Rules: []batchv1.PodFailurePolicyRule{ + { + // Ignore failures of the non 0-indexed pods which fail until the marker file is created + Action: batchv1.PodFailurePolicyActionIgnore, + OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{ + Operator: batchv1.PodFailurePolicyOnExitCodesOpIn, + Values: []int32{1}, + }, + }, + { + // Ignore the pod failure caused by the eviction + Action: batchv1.PodFailurePolicyActionIgnore, + OnPodConditions: []batchv1.PodFailurePolicyOnPodConditionsPattern{ + { + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + }, + }, + }, + }, + } + job, err = e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Awaiting for all non 0-indexed pods to succeed to ensure the marker file is created") + err = e2ejob.WaitForJobPodsSucceeded(f.ClientSet, f.Namespace.Name, job.Name, completions-1) + framework.ExpectNoError(err, "failed to await for all non 0-indexed pods to succeed for job: %s/%s", job.Name, job.Namespace) + + ginkgo.By("Awaiting for the 0-indexed pod to be running") + err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, 1) + framework.ExpectNoError(err, "failed to await for the 0-indexed pod to be running for the job: %s/%s", job.Name, job.Namespace) + + pods, err := e2ejob.GetAllRunningJobPods(f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err, "failed to get running pods for the job: %s/%s", job.Name, job.Namespace) + framework.ExpectEqual(len(pods), 1, "Exactly one running pod is expected") + pod := pods[0] + ginkgo.By(fmt.Sprintf("Evicting the running pod: %s/%s", pod.Name, pod.Namespace)) + evictTarget := &policyv1.Eviction{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + } + f.ClientSet.CoreV1().Pods(pod.Namespace).EvictV1(context.TODO(), evictTarget) + framework.ExpectNoError(err, "failed to evict the pod: %s/%s", pod.Name, pod.Namespace) + + ginkgo.By(fmt.Sprintf("Awaiting for the pod: %s/%s to be deleted", pod.Name, pod.Namespace)) + err = e2epod.WaitForPodNotFoundInNamespace(f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete) + framework.ExpectNoError(err, "failed to await for the pod to be deleted: %s/%s", pod.Name, pod.Namespace) + + ginkgo.By("Ensuring job reaches completions") + err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions) + framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) + }) + ginkgo.It("should not create pods when created in suspend state", func() { ginkgo.By("Creating a job with suspend=true") job := e2ejob.NewTestJob("succeed", "suspend-true-to-false", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) @@ -215,7 +302,7 @@ var _ = SIGDescribe("Job", func() { framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) ginkgo.By("Ensure pods equal to parallelism count is attached to the job") - err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) + err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name) ginkgo.By("Updating the job with suspend=true") @@ -302,7 +389,7 @@ var _ = SIGDescribe("Job", func() { framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) ginkgo.By("Ensure pods equal to parallelism count is attached to the job") - err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) + err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name) ginkgo.By("Delete the job") @@ -382,7 +469,7 @@ var _ = SIGDescribe("Job", func() { framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) ginkgo.By("Ensuring active pods == parallelism") - err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) + err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name) ginkgo.By("delete a job") @@ -412,7 +499,7 @@ var _ = SIGDescribe("Job", func() { job.Kind = kind ginkgo.By("Ensuring active pods == parallelism") - err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) + err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name) ginkgo.By("Orphaning one of the Job's Pods") @@ -541,7 +628,7 @@ var _ = SIGDescribe("Job", func() { framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) ginkgo.By("Ensure pods equal to parallelism count is attached to the job") - err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) + err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism) framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name) // /status subresource operations diff --git a/test/e2e/framework/job/fixtures.go b/test/e2e/framework/job/fixtures.go index d07286d3151..ca0f6af8b28 100644 --- a/test/e2e/framework/job/fixtures.go +++ b/test/e2e/framework/job/fixtures.go @@ -103,17 +103,30 @@ func NewTestJobOnNode(behavior, name string, rPol v1.RestartPolicy, parallelism, // If RestartPolicy is Never, the nodeName should be set to // ensure all job pods run on a single node and the volume // will be mounted from a hostPath instead. - if len(nodeName) > 0 { - randomDir := "/tmp/job-e2e/" + rand.String(10) - hostPathType := v1.HostPathDirectoryOrCreate - job.Spec.Template.Spec.Volumes[0].VolumeSource = v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: randomDir, Type: &hostPathType}} - // Tests involving r/w operations on hostPath volume needs to run in - // privileged mode for SELinux enabled distro, while Windows platform - // neither supports nor needs privileged mode. - privileged := !framework.NodeOSDistroIs("windows") - job.Spec.Template.Spec.Containers[0].SecurityContext.Privileged = &privileged - } + setupHostPathDirectory(job) job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "if [[ -r /data/foo ]] ; then exit 0 ; else touch /data/foo ; exit 1 ; fi"} + case "notTerminateOnce": + // Do not terminate the 0-indexed pod in the first run and + // succeed the second time. Fail the non-0-indexed pods until + // the marker file is created by the 0-indexed pod. The fact that + // the non-0-indexed pods are succeeded is used to determine that the + // 0th indexed pod already created the marker file. + setupHostPathDirectory(job) + job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "if [[ -r /data/foo ]] ; then exit 0 ; elif [[ $JOB_COMPLETION_INDEX -eq 0 ]] ; then touch /data/foo ; sleep 1000000 ; else exit 1 ; fi"} } return job } + +// setup host path directory to pass information between pod restarts +func setupHostPathDirectory(job *batchv1.Job) { + if len(job.Spec.Template.Spec.NodeName) > 0 { + randomDir := "/tmp/job-e2e/" + rand.String(10) + hostPathType := v1.HostPathDirectoryOrCreate + job.Spec.Template.Spec.Volumes[0].VolumeSource = v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: randomDir, Type: &hostPathType}} + // Tests involving r/w operations on hostPath volume needs to run in + // privileged mode for SELinux enabled distro, while Windows platform + // neither supports nor needs privileged mode. + privileged := !framework.NodeOSDistroIs("windows") + job.Spec.Template.Spec.Containers[0].SecurityContext.Privileged = &privileged + } +} diff --git a/test/e2e/framework/job/rest.go b/test/e2e/framework/job/rest.go index f653c34cc8a..0d3f38a700f 100644 --- a/test/e2e/framework/job/rest.go +++ b/test/e2e/framework/job/rest.go @@ -31,6 +31,21 @@ func GetJob(c clientset.Interface, ns, name string) (*batchv1.Job, error) { return c.BatchV1().Jobs(ns).Get(context.TODO(), name, metav1.GetOptions{}) } +// GetAllRunningJobPods returns a list of all running Pods belonging to a Job. +func GetAllRunningJobPods(c clientset.Interface, ns, jobName string) ([]v1.Pod, error) { + if podList, err := GetJobPods(c, ns, jobName); err != nil { + return nil, err + } else { + pods := []v1.Pod{} + for _, pod := range podList.Items { + if pod.Status.Phase == v1.PodRunning { + pods = append(pods, pod) + } + } + return pods, nil + } +} + // GetJobPods returns a list of Pods belonging to a Job. func GetJobPods(c clientset.Interface, ns, jobName string) (*v1.PodList, error) { label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName})) diff --git a/test/e2e/framework/job/wait.go b/test/e2e/framework/job/wait.go index de7fa9257d2..13e1f63f63e 100644 --- a/test/e2e/framework/job/wait.go +++ b/test/e2e/framework/job/wait.go @@ -29,9 +29,19 @@ import ( "k8s.io/kubernetes/test/e2e/framework" ) -// WaitForAllJobPodsRunning wait for all pods for the Job named JobName in namespace ns to become Running. Only use +// WaitForJobPodsRunning wait for all pods for the Job named JobName in namespace ns to become Running. Only use // when pods will run for a long time, or it will be racy. -func WaitForAllJobPodsRunning(c clientset.Interface, ns, jobName string, parallelism int32) error { +func WaitForJobPodsRunning(c clientset.Interface, ns, jobName string, expectedCount int32) error { + return waitForJobPodsInPhase(c, ns, jobName, expectedCount, v1.PodRunning) +} + +// WaitForJobPodsSucceeded wait for all pods for the Job named JobName in namespace ns to become Succeeded. +func WaitForJobPodsSucceeded(c clientset.Interface, ns, jobName string, expectedCount int32) error { + return waitForJobPodsInPhase(c, ns, jobName, expectedCount, v1.PodSucceeded) +} + +// waitForJobPodsInPhase wait for all pods for the Job named JobName in namespace ns to be in a given phase. +func waitForJobPodsInPhase(c clientset.Interface, ns, jobName string, expectedCount int32, phase v1.PodPhase) error { return wait.Poll(framework.Poll, JobTimeout, func() (bool, error) { pods, err := GetJobPods(c, ns, jobName) if err != nil { @@ -39,11 +49,11 @@ func WaitForAllJobPodsRunning(c clientset.Interface, ns, jobName string, paralle } count := int32(0) for _, p := range pods.Items { - if p.Status.Phase == v1.PodRunning { + if p.Status.Phase == phase { count++ } } - return count == parallelism, nil + return count == expectedCount, nil }) } diff --git a/test/e2e/scheduling/nvidia-gpus.go b/test/e2e/scheduling/nvidia-gpus.go index 298dc41732c..6e6f8201ceb 100644 --- a/test/e2e/scheduling/nvidia-gpus.go +++ b/test/e2e/scheduling/nvidia-gpus.go @@ -240,7 +240,7 @@ func testNvidiaGPUsJob(f *framework.Framework) { framework.ExpectNoError(err) // make sure job is running by waiting for its first pod to start running - err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, 1) + err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, 1) framework.ExpectNoError(err) numNodes, err := e2enode.TotalRegistered(f.ClientSet) diff --git a/test/e2e/upgrades/apps/job.go b/test/e2e/upgrades/apps/job.go index 53c554b76d4..c922989b575 100644 --- a/test/e2e/upgrades/apps/job.go +++ b/test/e2e/upgrades/apps/job.go @@ -53,7 +53,7 @@ func (t *JobUpgradeTest) Setup(f *framework.Framework) { framework.ExpectNoError(err) ginkgo.By("Ensuring active pods == parallelism") - err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, t.namespace, job.Name, 2) + err = e2ejob.WaitForJobPodsRunning(f.ClientSet, t.namespace, job.Name, 2) framework.ExpectNoError(err) }