Merge pull request #127695 from kaisoz/wait-for-job-failfast

Fail fast when waiting for job conditions in e2e tests
This commit is contained in:
Kubernetes Prow Robot 2024-10-10 22:28:19 +01:00 committed by GitHub
commit c15581b277
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 79 additions and 39 deletions

View File

@ -137,7 +137,7 @@ var _ = SIGDescribe("Job", func() {
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Ensuring job fails")
err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
err = e2ejob.WaitForJobFailed(ctx, f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to ensure job failure in namespace: %s", f.Namespace.Name)
})
@ -605,7 +605,7 @@ done`}
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to fail as there are failed indexes")
err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
err = e2ejob.WaitForJobFailed(ctx, f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
ginkgo.By("Verifying the Job status fields to ensure all indexes were executed")
@ -641,7 +641,7 @@ done`}
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to fail as the number of max failed indexes is exceeded")
err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
err = e2ejob.WaitForJobFailed(ctx, f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
ginkgo.By("Verifying the Job status fields to ensure early termination of the job")
@ -684,7 +684,7 @@ done`}
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to fail as all indexes are failed")
err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
err = e2ejob.WaitForJobFailed(ctx, f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
ginkgo.By("Verifying the Job status fields to ensure the upper indexes didn't execute")

View File

@ -21,12 +21,14 @@ import (
"fmt"
"time"
"github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/utils/format"
"k8s.io/utils/ptr"
@ -82,14 +84,33 @@ func waitForJobPodsInPhase(ctx context.Context, c clientset.Interface, ns, jobNa
// both conformance CI jobs with GA-only features and e2e CI jobs with all default-enabled features.
// So, we need to skip "Complete" condition reason verifications in the e2e conformance test cases.
func WaitForJobComplete(ctx context.Context, c clientset.Interface, ns, jobName string, reason *string, completions int32) error {
if err := wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, false, func(ctx context.Context) (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{})
// This function is called by HandleRetry, which will retry
// on transient API errors or stop polling in the case of other errors.
get := func(ctx context.Context) (*batchv1.Job, error) {
job, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{})
if err != nil {
return false, err
return nil, err
}
return curr.Status.Succeeded == completions, nil
}); err != nil {
return nil
if isJobFailed(job) {
return nil, gomega.StopTrying("job failed while waiting for its completion").Attach("job", job)
}
return job, nil
}
match := func(job *batchv1.Job) (func() string, error) {
if job.Status.Succeeded == completions {
return nil, nil
}
return func() string {
return fmt.Sprintf("expected job %q to have %v successful pods. got %v", klog.KObj(job), completions, job.Status.Succeeded)
}, nil
}
err := framework.Gomega().
Eventually(ctx, framework.HandleRetry(get)).
WithTimeout(JobTimeout).
WithPolling(framework.Poll).
Should(framework.MakeMatcher(match))
if err != nil {
return err
}
return WaitForJobCondition(ctx, c, ns, jobName, batchv1.JobComplete, reason)
}
@ -117,48 +138,55 @@ func WaitForJobSuspend(ctx context.Context, c clientset.Interface, ns, jobName s
}
// WaitForJobFailed uses c to wait for the Job jobName in namespace ns to fail
func WaitForJobFailed(c clientset.Interface, ns, jobName string) error {
return wait.PollImmediate(framework.Poll, JobTimeout, func() (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(context.TODO(), jobName, metav1.GetOptions{})
func WaitForJobFailed(ctx context.Context, c clientset.Interface, ns, jobName string) error {
// This function is called by HandleRetry, which will retry
// on transient API errors or stop polling in the case of other errors.
get := func(ctx context.Context) (*batchv1.Job, error) {
job, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{})
if err != nil {
return false, err
return nil, err
}
return isJobFailed(curr), nil
})
if isJobCompleted(job) {
return nil, gomega.StopTrying("job completed while waiting for its failure").Attach("job", job)
}
return job, nil
}
match := func(job *batchv1.Job) (func() string, error) {
if isJobFailed(job) {
return nil, nil
}
return func() string {
return fmt.Sprintf("expected job %q to fail", klog.KObj(job))
}, nil
}
return framework.Gomega().
Eventually(ctx, framework.HandleRetry(get)).
WithTimeout(JobTimeout).
WithPolling(framework.Poll).
Should(framework.MakeMatcher(match))
}
// WaitForJobCondition waits for the specified Job to have the expected condition with the specific reason.
// When the nil reason is passed, the "reason" string in the condition is
// not checked.
func WaitForJobCondition(ctx context.Context, c clientset.Interface, ns, jobName string, cType batchv1.JobConditionType, reason *string) error {
err := wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, false, func(ctx context.Context) (bool, error) {
curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{})
if err != nil {
return false, err
}
for _, c := range curr.Status.Conditions {
match := func(job *batchv1.Job) (func() string, error) {
for _, c := range job.Status.Conditions {
if c.Type == cType && c.Status == v1.ConditionTrue {
if reason == nil || *reason == c.Reason {
return true, nil
return nil, nil
}
}
}
return false, nil
})
if err != nil {
return fmt.Errorf("waiting for Job %q to have the condition %q with reason: %v: %w", jobName, cType, reason, err)
return func() string {
return fmt.Sprintf("expected job %q to reach the expected condition %q with reason %q", klog.KObj(job), cType, ptr.Deref(reason, "<nil>"))
}, nil
}
return nil
}
func isJobFailed(j *batchv1.Job) bool {
for _, c := range j.Status.Conditions {
if (c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue {
return true
}
}
return false
return framework.Gomega().
Eventually(ctx, framework.GetObject(c.BatchV1().Jobs(ns).Get, jobName, metav1.GetOptions{})).
WithTimeout(JobTimeout).
WithPolling(framework.Poll).
Should(framework.MakeMatcher(match))
}
// WaitForJobFinish uses c to wait for the Job jobName in namespace ns to finish (either Failed or Complete).
@ -179,8 +207,20 @@ func WaitForJobFinishWithTimeout(ctx context.Context, c clientset.Interface, ns,
}
func isJobFinished(j *batchv1.Job) bool {
return isJobCompleted(j) || isJobFailed(j)
}
func isJobFailed(j *batchv1.Job) bool {
return isConditionTrue(j, batchv1.JobFailed)
}
func isJobCompleted(j *batchv1.Job) bool {
return isConditionTrue(j, batchv1.JobComplete)
}
func isConditionTrue(j *batchv1.Job, condition batchv1.JobConditionType) bool {
for _, c := range j.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue {
if c.Type == condition && c.Status == v1.ConditionTrue {
return true
}
}