improve the code in test/e2e/framework/job/wait.go

This commit is contained in:
carlory 2024-11-04 11:59:54 +08:00
parent 6cb5ea56cb
commit f78c903537

View File

@ -26,7 +26,6 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
@ -60,19 +59,29 @@ func WaitForJobPodsSucceeded(ctx context.Context, c clientset.Interface, ns, job
// waitForJobPodsInPhase wait for all pods for the Job named JobName in namespace ns to be in a given phase. // waitForJobPodsInPhase wait for all pods for the Job named JobName in namespace ns to be in a given phase.
func waitForJobPodsInPhase(ctx context.Context, c clientset.Interface, ns, jobName string, expectedCount int32, phase v1.PodPhase, timeout time.Duration) error { func waitForJobPodsInPhase(ctx context.Context, c clientset.Interface, ns, jobName string, expectedCount int32, phase v1.PodPhase, timeout time.Duration) error {
return wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, false, func(ctx context.Context) (bool, error) { get := func(ctx context.Context) (*v1.PodList, error) {
pods, err := GetJobPods(ctx, c, ns, jobName) return GetJobPods(ctx, c, ns, jobName)
if err != nil { }
return false, err match := func(pods *v1.PodList) (func() string, error) {
}
count := int32(0) count := int32(0)
for _, p := range pods.Items { for _, p := range pods.Items {
if p.Status.Phase == phase { if p.Status.Phase == phase {
count++ count++
} }
} }
return count == expectedCount, nil if count == expectedCount {
}) return nil, nil
}
return func() string {
return fmt.Sprintf("job %q expected %d pods in %q phase, but got %d:\n%s",
klog.KRef(ns, jobName), expectedCount, phase, count, format.Object(pods, 1))
}, nil
}
return framework.Gomega().
Eventually(ctx, framework.HandleRetry(get)).
WithPolling(framework.Poll).
WithTimeout(timeout).
Should(framework.MakeMatcher(match))
} }
// WaitForJobComplete uses c to wait for completions to complete for the Job jobName in namespace ns. // WaitForJobComplete uses c to wait for completions to complete for the Job jobName in namespace ns.
@ -196,14 +205,18 @@ func WaitForJobFinish(ctx context.Context, c clientset.Interface, ns, jobName st
// WaitForJobFinishWithTimeout uses c to wait for the Job jobName in namespace ns to finish (either Failed or Complete). // WaitForJobFinishWithTimeout uses c to wait for the Job jobName in namespace ns to finish (either Failed or Complete).
func WaitForJobFinishWithTimeout(ctx context.Context, c clientset.Interface, ns, jobName string, timeout time.Duration) error { func WaitForJobFinishWithTimeout(ctx context.Context, c clientset.Interface, ns, jobName string, timeout time.Duration) error {
return wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) { return framework.Gomega().
curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{}) Eventually(ctx, framework.GetObject(c.BatchV1().Jobs(ns).Get, jobName, metav1.GetOptions{})).
if err != nil { WithPolling(framework.Poll).
return false, err WithTimeout(timeout).
} Should(framework.MakeMatcher(func(job *batchv1.Job) (func() string, error) {
if isJobFinished(job) {
return isJobFinished(curr), nil return nil, nil
}) }
return func() string {
return fmt.Sprintf("expected job %q to be finished\n%s", klog.KObj(job), format.Object(job, 1))
}, nil
}))
} }
func isJobFinished(j *batchv1.Job) bool { func isJobFinished(j *batchv1.Job) bool {
@ -230,32 +243,33 @@ func isConditionTrue(j *batchv1.Job, condition batchv1.JobConditionType) bool {
// WaitForJobGone uses c to wait for up to timeout for the Job named jobName in namespace ns to be removed. // WaitForJobGone uses c to wait for up to timeout for the Job named jobName in namespace ns to be removed.
func WaitForJobGone(ctx context.Context, c clientset.Interface, ns, jobName string, timeout time.Duration) error { func WaitForJobGone(ctx context.Context, c clientset.Interface, ns, jobName string, timeout time.Duration) error {
return wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, false, func(ctx context.Context) (bool, error) { return framework.Gomega().
_, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{}) Eventually(ctx, func(ctx context.Context) error {
if apierrors.IsNotFound(err) { _, err := framework.GetObject(c.BatchV1().Jobs(ns).Get, jobName, metav1.GetOptions{})(ctx)
return true, nil return err
} }).
return false, err WithPolling(framework.Poll).
}) WithTimeout(timeout).
Should(gomega.MatchError(apierrors.IsNotFound, fmt.Sprintf("that expected job %q to be gone", klog.KRef(ns, jobName))))
} }
// WaitForAllJobPodsGone waits for all pods for the Job named jobName in namespace ns // WaitForAllJobPodsGone waits for all pods for the Job named jobName in namespace ns
// to be deleted. // to be deleted.
func WaitForAllJobPodsGone(ctx context.Context, c clientset.Interface, ns, jobName string) error { func WaitForAllJobPodsGone(ctx context.Context, c clientset.Interface, ns, jobName string) error {
return wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, true, func(ctx context.Context) (bool, error) { get := func(ctx context.Context) (*v1.PodList, error) {
pods, err := GetJobPods(ctx, c, ns, jobName) return GetJobPods(ctx, c, ns, jobName)
if err != nil { }
return false, err return framework.Gomega().
} Eventually(ctx, framework.HandleRetry(get)).
return len(pods.Items) == 0, nil WithPolling(framework.Poll).
}) WithTimeout(JobTimeout).
Should(gomega.HaveField("Items", gomega.BeEmpty()))
} }
// WaitForJobState waits for a job to be matched to the given condition. // WaitForJobState waits for a job to be matched to the given state function.
// The condition callback may use gomega.StopTrying to abort early.
func WaitForJobState(ctx context.Context, c clientset.Interface, ns, jobName string, timeout time.Duration, state JobState) error { func WaitForJobState(ctx context.Context, c clientset.Interface, ns, jobName string, timeout time.Duration, state JobState) error {
return framework.Gomega(). return framework.Gomega().
Eventually(ctx, framework.RetryNotFound(framework.GetObject(c.BatchV1().Jobs(ns).Get, jobName, metav1.GetOptions{}))). Eventually(ctx, framework.GetObject(c.BatchV1().Jobs(ns).Get, jobName, metav1.GetOptions{})).
WithTimeout(timeout). WithTimeout(timeout).
Should(framework.MakeMatcher(func(job *batchv1.Job) (func() string, error) { Should(framework.MakeMatcher(func(job *batchv1.Job) (func() string, error) {
matches := state(job) matches := state(job)