mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Add e2e test for job pod failure policy used to match pod disruption
This commit is contained in:
parent
77b027936a
commit
41285a7c91
@ -25,6 +25,7 @@ import (
|
|||||||
|
|
||||||
batchv1 "k8s.io/api/batch/v1"
|
batchv1 "k8s.io/api/batch/v1"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
policyv1 "k8s.io/api/policy/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -169,6 +170,92 @@ var _ = SIGDescribe("Job", func() {
|
|||||||
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
|
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// This test is using an indexed job. The pod corresponding to the 0th index
|
||||||
|
// creates a marker file on the host and runs 'forever' until evicted. We use
|
||||||
|
// the non-0-indexed pods to determine if the marker file is already
|
||||||
|
// created by the 0th indexed pod - the non-0-indexed pods fail and restart
|
||||||
|
// until the marker file is created (their potential failures are ignored
|
||||||
|
// based on the exit code). Once the marker file is created the 0th indexed
|
||||||
|
// pod is evicted (DisruptionTarget condition is added in the process),
|
||||||
|
// after restart it runs to successful completion.
|
||||||
|
// Steps:
|
||||||
|
// 1. Select a node to run all Job's pods to ensure the host marker file is accessible by all pods
|
||||||
|
// 2. Create the indexed job
|
||||||
|
// 3. Await for all non-0-indexed pods to succeed to ensure the marker file is created by the 0-indexed pod
|
||||||
|
// 4. Make sure the 0-indexed pod is running
|
||||||
|
// 5. Evict the 0-indexed pod
|
||||||
|
// 6. Await for the job to successfully complete
|
||||||
|
ginkgo.It("should allow to use the pod failure policy to not count pod disruption towards the backoffLimit", func() {
|
||||||
|
mode := batchv1.IndexedCompletion
|
||||||
|
|
||||||
|
// We set the backoffLimit to 0 so that any pod failure would trigger
|
||||||
|
// job failure if not for the pod failure policy to ignore the failed
|
||||||
|
// pods from counting them towards the backoffLimit.
|
||||||
|
backoffLimit := int32(0)
|
||||||
|
|
||||||
|
ginkgo.By("Looking for a node to schedule job pods")
|
||||||
|
node, err := e2enode.GetRandomReadySchedulableNode(f.ClientSet)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
|
ginkgo.By("Creating a job")
|
||||||
|
job := e2ejob.NewTestJobOnNode("notTerminateOnce", "pod-disruption-failure-ignore", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name)
|
||||||
|
job.Spec.CompletionMode = &mode
|
||||||
|
job.Spec.PodFailurePolicy = &batchv1.PodFailurePolicy{
|
||||||
|
Rules: []batchv1.PodFailurePolicyRule{
|
||||||
|
{
|
||||||
|
// Ignore failures of the non 0-indexed pods which fail until the marker file is created
|
||||||
|
Action: batchv1.PodFailurePolicyActionIgnore,
|
||||||
|
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
|
||||||
|
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
|
||||||
|
Values: []int32{1},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Ignore the pod failure caused by the eviction
|
||||||
|
Action: batchv1.PodFailurePolicyActionIgnore,
|
||||||
|
OnPodConditions: []batchv1.PodFailurePolicyOnPodConditionsPattern{
|
||||||
|
{
|
||||||
|
Type: v1.DisruptionTarget,
|
||||||
|
Status: v1.ConditionTrue,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
job, err = e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||||
|
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
|
||||||
|
|
||||||
|
ginkgo.By("Awaiting for all non 0-indexed pods to succeed to ensure the marker file is created")
|
||||||
|
err = e2ejob.WaitForJobPodsSucceeded(f.ClientSet, f.Namespace.Name, job.Name, completions-1)
|
||||||
|
framework.ExpectNoError(err, "failed to await for all non 0-indexed pods to succeed for job: %s/%s", job.Name, job.Namespace)
|
||||||
|
|
||||||
|
ginkgo.By("Awaiting for the 0-indexed pod to be running")
|
||||||
|
err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, 1)
|
||||||
|
framework.ExpectNoError(err, "failed to await for the 0-indexed pod to be running for the job: %s/%s", job.Name, job.Namespace)
|
||||||
|
|
||||||
|
pods, err := e2ejob.GetAllRunningJobPods(f.ClientSet, f.Namespace.Name, job.Name)
|
||||||
|
framework.ExpectNoError(err, "failed to get running pods for the job: %s/%s", job.Name, job.Namespace)
|
||||||
|
framework.ExpectEqual(len(pods), 1, "Exactly one running pod is expected")
|
||||||
|
pod := pods[0]
|
||||||
|
ginkgo.By(fmt.Sprintf("Evicting the running pod: %s/%s", pod.Name, pod.Namespace))
|
||||||
|
evictTarget := &policyv1.Eviction{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: pod.Name,
|
||||||
|
Namespace: pod.Namespace,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
f.ClientSet.CoreV1().Pods(pod.Namespace).EvictV1(context.TODO(), evictTarget)
|
||||||
|
framework.ExpectNoError(err, "failed to evict the pod: %s/%s", pod.Name, pod.Namespace)
|
||||||
|
|
||||||
|
ginkgo.By(fmt.Sprintf("Awaiting for the pod: %s/%s to be deleted", pod.Name, pod.Namespace))
|
||||||
|
err = e2epod.WaitForPodNotFoundInNamespace(f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)
|
||||||
|
framework.ExpectNoError(err, "failed to await for the pod to be deleted: %s/%s", pod.Name, pod.Namespace)
|
||||||
|
|
||||||
|
ginkgo.By("Ensuring job reaches completions")
|
||||||
|
err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions)
|
||||||
|
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
|
||||||
|
})
|
||||||
|
|
||||||
ginkgo.It("should not create pods when created in suspend state", func() {
|
ginkgo.It("should not create pods when created in suspend state", func() {
|
||||||
ginkgo.By("Creating a job with suspend=true")
|
ginkgo.By("Creating a job with suspend=true")
|
||||||
job := e2ejob.NewTestJob("succeed", "suspend-true-to-false", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
|
job := e2ejob.NewTestJob("succeed", "suspend-true-to-false", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
|
||||||
@ -215,7 +302,7 @@ var _ = SIGDescribe("Job", func() {
|
|||||||
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
|
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
|
||||||
|
|
||||||
ginkgo.By("Ensure pods equal to parallelism count is attached to the job")
|
ginkgo.By("Ensure pods equal to parallelism count is attached to the job")
|
||||||
err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
|
err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
|
||||||
framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
|
framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
|
||||||
|
|
||||||
ginkgo.By("Updating the job with suspend=true")
|
ginkgo.By("Updating the job with suspend=true")
|
||||||
@ -302,7 +389,7 @@ var _ = SIGDescribe("Job", func() {
|
|||||||
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
|
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
|
||||||
|
|
||||||
ginkgo.By("Ensure pods equal to parallelism count is attached to the job")
|
ginkgo.By("Ensure pods equal to parallelism count is attached to the job")
|
||||||
err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
|
err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
|
||||||
framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
|
framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
|
||||||
|
|
||||||
ginkgo.By("Delete the job")
|
ginkgo.By("Delete the job")
|
||||||
@ -382,7 +469,7 @@ var _ = SIGDescribe("Job", func() {
|
|||||||
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
|
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
|
||||||
|
|
||||||
ginkgo.By("Ensuring active pods == parallelism")
|
ginkgo.By("Ensuring active pods == parallelism")
|
||||||
err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
|
err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
|
||||||
framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name)
|
framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name)
|
||||||
|
|
||||||
ginkgo.By("delete a job")
|
ginkgo.By("delete a job")
|
||||||
@ -412,7 +499,7 @@ var _ = SIGDescribe("Job", func() {
|
|||||||
job.Kind = kind
|
job.Kind = kind
|
||||||
|
|
||||||
ginkgo.By("Ensuring active pods == parallelism")
|
ginkgo.By("Ensuring active pods == parallelism")
|
||||||
err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
|
err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
|
||||||
framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name)
|
framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name)
|
||||||
|
|
||||||
ginkgo.By("Orphaning one of the Job's Pods")
|
ginkgo.By("Orphaning one of the Job's Pods")
|
||||||
@ -541,7 +628,7 @@ var _ = SIGDescribe("Job", func() {
|
|||||||
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
|
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
|
||||||
|
|
||||||
ginkgo.By("Ensure pods equal to parallelism count is attached to the job")
|
ginkgo.By("Ensure pods equal to parallelism count is attached to the job")
|
||||||
err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
|
err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, parallelism)
|
||||||
framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
|
framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
|
||||||
|
|
||||||
// /status subresource operations
|
// /status subresource operations
|
||||||
|
@ -103,7 +103,23 @@ func NewTestJobOnNode(behavior, name string, rPol v1.RestartPolicy, parallelism,
|
|||||||
// If RestartPolicy is Never, the nodeName should be set to
|
// If RestartPolicy is Never, the nodeName should be set to
|
||||||
// ensure all job pods run on a single node and the volume
|
// ensure all job pods run on a single node and the volume
|
||||||
// will be mounted from a hostPath instead.
|
// will be mounted from a hostPath instead.
|
||||||
if len(nodeName) > 0 {
|
setupHostPathDirectory(job)
|
||||||
|
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "if [[ -r /data/foo ]] ; then exit 0 ; else touch /data/foo ; exit 1 ; fi"}
|
||||||
|
case "notTerminateOnce":
|
||||||
|
// Do not terminate the 0-indexed pod in the first run and
|
||||||
|
// succeed the second time. Fail the non-0-indexed pods until
|
||||||
|
// the marker file is created by the 0-indexed pod. The fact that
|
||||||
|
// the non-0-indexed pods are succeeded is used to determine that the
|
||||||
|
// 0th indexed pod already created the marker file.
|
||||||
|
setupHostPathDirectory(job)
|
||||||
|
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "if [[ -r /data/foo ]] ; then exit 0 ; elif [[ $JOB_COMPLETION_INDEX -eq 0 ]] ; then touch /data/foo ; sleep 1000000 ; else exit 1 ; fi"}
|
||||||
|
}
|
||||||
|
return job
|
||||||
|
}
|
||||||
|
|
||||||
|
// setup host path directory to pass information between pod restarts
|
||||||
|
func setupHostPathDirectory(job *batchv1.Job) {
|
||||||
|
if len(job.Spec.Template.Spec.NodeName) > 0 {
|
||||||
randomDir := "/tmp/job-e2e/" + rand.String(10)
|
randomDir := "/tmp/job-e2e/" + rand.String(10)
|
||||||
hostPathType := v1.HostPathDirectoryOrCreate
|
hostPathType := v1.HostPathDirectoryOrCreate
|
||||||
job.Spec.Template.Spec.Volumes[0].VolumeSource = v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: randomDir, Type: &hostPathType}}
|
job.Spec.Template.Spec.Volumes[0].VolumeSource = v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: randomDir, Type: &hostPathType}}
|
||||||
@ -113,7 +129,4 @@ func NewTestJobOnNode(behavior, name string, rPol v1.RestartPolicy, parallelism,
|
|||||||
privileged := !framework.NodeOSDistroIs("windows")
|
privileged := !framework.NodeOSDistroIs("windows")
|
||||||
job.Spec.Template.Spec.Containers[0].SecurityContext.Privileged = &privileged
|
job.Spec.Template.Spec.Containers[0].SecurityContext.Privileged = &privileged
|
||||||
}
|
}
|
||||||
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "if [[ -r /data/foo ]] ; then exit 0 ; else touch /data/foo ; exit 1 ; fi"}
|
|
||||||
}
|
|
||||||
return job
|
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,21 @@ func GetJob(c clientset.Interface, ns, name string) (*batchv1.Job, error) {
|
|||||||
return c.BatchV1().Jobs(ns).Get(context.TODO(), name, metav1.GetOptions{})
|
return c.BatchV1().Jobs(ns).Get(context.TODO(), name, metav1.GetOptions{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetAllRunningJobPods returns a list of all running Pods belonging to a Job.
|
||||||
|
func GetAllRunningJobPods(c clientset.Interface, ns, jobName string) ([]v1.Pod, error) {
|
||||||
|
if podList, err := GetJobPods(c, ns, jobName); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
pods := []v1.Pod{}
|
||||||
|
for _, pod := range podList.Items {
|
||||||
|
if pod.Status.Phase == v1.PodRunning {
|
||||||
|
pods = append(pods, pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return pods, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// GetJobPods returns a list of Pods belonging to a Job.
|
// GetJobPods returns a list of Pods belonging to a Job.
|
||||||
func GetJobPods(c clientset.Interface, ns, jobName string) (*v1.PodList, error) {
|
func GetJobPods(c clientset.Interface, ns, jobName string) (*v1.PodList, error) {
|
||||||
label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName}))
|
label := labels.SelectorFromSet(labels.Set(map[string]string{JobSelectorKey: jobName}))
|
||||||
|
@ -29,9 +29,19 @@ import (
|
|||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WaitForAllJobPodsRunning wait for all pods for the Job named JobName in namespace ns to become Running. Only use
|
// WaitForJobPodsRunning wait for all pods for the Job named JobName in namespace ns to become Running. Only use
|
||||||
// when pods will run for a long time, or it will be racy.
|
// when pods will run for a long time, or it will be racy.
|
||||||
func WaitForAllJobPodsRunning(c clientset.Interface, ns, jobName string, parallelism int32) error {
|
func WaitForJobPodsRunning(c clientset.Interface, ns, jobName string, expectedCount int32) error {
|
||||||
|
return waitForJobPodsInPhase(c, ns, jobName, expectedCount, v1.PodRunning)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForJobPodsSucceeded wait for all pods for the Job named JobName in namespace ns to become Succeeded.
|
||||||
|
func WaitForJobPodsSucceeded(c clientset.Interface, ns, jobName string, expectedCount int32) error {
|
||||||
|
return waitForJobPodsInPhase(c, ns, jobName, expectedCount, v1.PodSucceeded)
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitForJobPodsInPhase wait for all pods for the Job named JobName in namespace ns to be in a given phase.
|
||||||
|
func waitForJobPodsInPhase(c clientset.Interface, ns, jobName string, expectedCount int32, phase v1.PodPhase) error {
|
||||||
return wait.Poll(framework.Poll, JobTimeout, func() (bool, error) {
|
return wait.Poll(framework.Poll, JobTimeout, func() (bool, error) {
|
||||||
pods, err := GetJobPods(c, ns, jobName)
|
pods, err := GetJobPods(c, ns, jobName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -39,11 +49,11 @@ func WaitForAllJobPodsRunning(c clientset.Interface, ns, jobName string, paralle
|
|||||||
}
|
}
|
||||||
count := int32(0)
|
count := int32(0)
|
||||||
for _, p := range pods.Items {
|
for _, p := range pods.Items {
|
||||||
if p.Status.Phase == v1.PodRunning {
|
if p.Status.Phase == phase {
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return count == parallelism, nil
|
return count == expectedCount, nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,7 +240,7 @@ func testNvidiaGPUsJob(f *framework.Framework) {
|
|||||||
framework.ExpectNoError(err)
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
// make sure job is running by waiting for its first pod to start running
|
// make sure job is running by waiting for its first pod to start running
|
||||||
err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, 1)
|
err = e2ejob.WaitForJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, 1)
|
||||||
framework.ExpectNoError(err)
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
numNodes, err := e2enode.TotalRegistered(f.ClientSet)
|
numNodes, err := e2enode.TotalRegistered(f.ClientSet)
|
||||||
|
@ -53,7 +53,7 @@ func (t *JobUpgradeTest) Setup(f *framework.Framework) {
|
|||||||
framework.ExpectNoError(err)
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
ginkgo.By("Ensuring active pods == parallelism")
|
ginkgo.By("Ensuring active pods == parallelism")
|
||||||
err = e2ejob.WaitForAllJobPodsRunning(f.ClientSet, t.namespace, job.Name, 2)
|
err = e2ejob.WaitForJobPodsRunning(f.ClientSet, t.namespace, job.Name, 2)
|
||||||
framework.ExpectNoError(err)
|
framework.ExpectNoError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user