From 07c34eb40049fb073506adec3df58b4abc358519 Mon Sep 17 00:00:00 2001 From: Tim Allclair Date: Thu, 5 May 2022 01:42:32 -0700 Subject: [PATCH] [E2E] Refactor pod polling functions (WaitForX) (#109704) * Clean up WaitFor Pod functions * Handle retryable errors when polling * Log more context on timeout * #squash Address PR feedback --- test/e2e/framework/pod/resource.go | 140 +-------- test/e2e/framework/pod/wait.go | 449 +++++++++++++++++++---------- 2 files changed, 310 insertions(+), 279 deletions(-) diff --git a/test/e2e/framework/pod/resource.go b/test/e2e/framework/pod/resource.go index af05512cc14..1bffc63de19 100644 --- a/test/e2e/framework/pod/resource.go +++ b/test/e2e/framework/pod/resource.go @@ -31,10 +31,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" - "k8s.io/kubectl/pkg/util/podutils" e2elog "k8s.io/kubernetes/test/e2e/framework/log" testutils "k8s.io/kubernetes/test/utils" @@ -155,70 +153,6 @@ func (r ProxyResponseChecker) CheckAllResponses() (done bool, err error) { return true, nil } -func podRunning(c clientset.Interface, podName, namespace string) wait.ConditionFunc { - return func() (bool, error) { - pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - return false, err - } - switch pod.Status.Phase { - case v1.PodRunning: - return true, nil - case v1.PodFailed, v1.PodSucceeded: - return false, errPodCompleted - } - return false, nil - } -} - -func podCompleted(c clientset.Interface, podName, namespace string) wait.ConditionFunc { - return func() (bool, error) { - pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - return false, err - } - switch pod.Status.Phase { - case v1.PodFailed, v1.PodSucceeded: - return true, nil - } - return false, nil - } -} - -func podRunningAndReady(c clientset.Interface, podName, namespace string) wait.ConditionFunc { - return func() (bool, error) { - pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - return false, err - } - switch pod.Status.Phase { - case v1.PodFailed, v1.PodSucceeded: - e2elog.Logf("The status of Pod %s is %s which is unexpected, pod status: %#v", podName, pod.Status.Phase, pod.Status) - return false, errPodCompleted - case v1.PodRunning: - e2elog.Logf("The status of Pod %s is %s (Ready = %v)", podName, pod.Status.Phase, podutils.IsPodReady(pod)) - return podutils.IsPodReady(pod), nil - } - e2elog.Logf("The status of Pod %s is %s, waiting for it to be Running (with Ready = true)", podName, pod.Status.Phase) - return false, nil - } -} - -func podNotPending(c clientset.Interface, podName, namespace string) wait.ConditionFunc { - return func() (bool, error) { - pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - return false, err - } - switch pod.Status.Phase { - case v1.PodPending: - return false, nil - default: - return true, nil - } - } -} - // PodsCreated returns a pod list matched by the given name. func PodsCreated(c clientset.Interface, ns, name string, replicas int32) (*v1.PodList, error) { label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) @@ -305,60 +239,6 @@ func podsRunning(c clientset.Interface, pods *v1.PodList) []error { return e } -func podContainerFailed(c clientset.Interface, namespace, podName string, containerIndex int, reason string) wait.ConditionFunc { - return func() (bool, error) { - pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - return false, err - } - switch pod.Status.Phase { - case v1.PodPending: - if len(pod.Status.ContainerStatuses) == 0 { - return false, nil - } - containerStatus := pod.Status.ContainerStatuses[containerIndex] - if containerStatus.State.Waiting != nil && containerStatus.State.Waiting.Reason == reason { - return true, nil - } - return false, nil - case v1.PodFailed, v1.PodRunning, v1.PodSucceeded: - return false, fmt.Errorf("pod was expected to be pending, but it is in the state: %s", pod.Status.Phase) - } - return false, nil - } -} - -func podContainerStarted(c clientset.Interface, namespace, podName string, containerIndex int) wait.ConditionFunc { - return func() (bool, error) { - pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - return false, err - } - if containerIndex > len(pod.Status.ContainerStatuses)-1 { - return false, nil - } - containerStatus := pod.Status.ContainerStatuses[containerIndex] - return *containerStatus.Started, nil - } -} - -func isContainerRunning(c clientset.Interface, namespace, podName, containerName string) wait.ConditionFunc { - return func() (bool, error) { - pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - return false, err - } - for _, statuses := range [][]v1.ContainerStatus{pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses, pod.Status.EphemeralContainerStatuses} { - for _, cs := range statuses { - if cs.Name == containerName { - return cs.State.Running != nil, nil - } - } - } - return false, nil - } -} - // LogPodStates logs basic info of provided pods for debugging. func LogPodStates(pods []v1.Pod) { // Find maximum widths for pod, node, and phase strings for column printing. @@ -565,13 +445,7 @@ func CreateExecPodOrFail(client clientset.Interface, ns, generateName string, tw } execPod, err := client.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{}) expectNoError(err, "failed to create new exec pod in namespace: %s", ns) - err = wait.PollImmediate(poll, 5*time.Minute, func() (bool, error) { - retrievedPod, err := client.CoreV1().Pods(execPod.Namespace).Get(context.TODO(), execPod.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - return retrievedPod.Status.Phase == v1.PodRunning, nil - }) + err = WaitForPodNameRunningInNamespace(client, execPod.Name, execPod.Namespace) expectNoError(err, "failed to create new exec pod in namespace: %s", ns) return execPod } @@ -769,3 +643,15 @@ func IsPodActive(p *v1.Pod) bool { v1.PodFailed != p.Status.Phase && p.DeletionTimestamp == nil } + +func podIdentifier(namespace, name string) string { + return fmt.Sprintf("%s/%s", namespace, name) +} + +func identifier(pod *v1.Pod) string { + id := podIdentifier(pod.Namespace, pod.Name) + if pod.UID != "" { + id += fmt.Sprintf("(%s)", pod.UID) + } + return id +} diff --git a/test/e2e/framework/pod/wait.go b/test/e2e/framework/pod/wait.go index ede53c96e4d..f3ebc56a922 100644 --- a/test/e2e/framework/pod/wait.go +++ b/test/e2e/framework/pod/wait.go @@ -25,6 +25,7 @@ import ( "time" "github.com/onsi/ginkgo" + "github.com/onsi/gomega/format" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -65,31 +66,71 @@ const ( type podCondition func(pod *v1.Pod) (bool, error) -// errorBadPodsStates create error message of basic info of bad pods for debugging. -func errorBadPodsStates(badPods []v1.Pod, desiredPods int, ns, desiredState string, timeout time.Duration, err error) string { - errStr := fmt.Sprintf("%d / %d pods in namespace %q are NOT in %s state in %v\n", len(badPods), desiredPods, ns, desiredState, timeout) - if err != nil { - errStr += fmt.Sprintf("Last error: %s\n", err) +type timeoutError struct { + msg string +} + +func (e *timeoutError) Error() string { + return e.msg +} + +func TimeoutError(format string, args ...interface{}) *timeoutError { + return &timeoutError{ + msg: fmt.Sprintf(format, args...), } +} + +// maybeTimeoutError returns a TimeoutError if err is a timeout. Otherwise, wrap err. +// taskFormat and taskArgs should be the task being performed when the error occurred, +// e.g. "waiting for pod to be running". +func maybeTimeoutError(err error, taskFormat string, taskArgs ...interface{}) error { + if IsTimeout(err) { + return TimeoutError("timed out while "+taskFormat, taskArgs...) + } else if err != nil { + return fmt.Errorf("error while %s: %w", fmt.Sprintf(taskFormat, taskArgs...), err) + } else { + return nil + } +} + +func IsTimeout(err error) bool { + if err == wait.ErrWaitTimeout { + return true + } + if _, ok := err.(*timeoutError); ok { + return true + } + return false +} + +// errorBadPodsStates create error message of basic info of bad pods for debugging. +func errorBadPodsStates(badPods []v1.Pod, desiredPods int, ns, desiredState string, timeout time.Duration, err error) error { + errStr := fmt.Sprintf("%d / %d pods in namespace %s are NOT in %s state in %v\n", len(badPods), desiredPods, ns, desiredState, timeout) + // Print bad pods info only if there are fewer than 10 bad pods if len(badPods) > 10 { - return errStr + "There are too many bad pods. Please check log for details." + errStr += "There are too many bad pods. Please check log for details." + } else { + buf := bytes.NewBuffer(nil) + w := tabwriter.NewWriter(buf, 0, 0, 1, ' ', 0) + fmt.Fprintln(w, "POD\tNODE\tPHASE\tGRACE\tCONDITIONS") + for _, badPod := range badPods { + grace := "" + if badPod.DeletionGracePeriodSeconds != nil { + grace = fmt.Sprintf("%ds", *badPod.DeletionGracePeriodSeconds) + } + podInfo := fmt.Sprintf("%s\t%s\t%s\t%s\t%+v", + badPod.ObjectMeta.Name, badPod.Spec.NodeName, badPod.Status.Phase, grace, badPod.Status.Conditions) + fmt.Fprintln(w, podInfo) + } + w.Flush() + errStr += buf.String() } - buf := bytes.NewBuffer(nil) - w := tabwriter.NewWriter(buf, 0, 0, 1, ' ', 0) - fmt.Fprintln(w, "POD\tNODE\tPHASE\tGRACE\tCONDITIONS") - for _, badPod := range badPods { - grace := "" - if badPod.DeletionGracePeriodSeconds != nil { - grace = fmt.Sprintf("%ds", *badPod.DeletionGracePeriodSeconds) - } - podInfo := fmt.Sprintf("%s\t%s\t%s\t%s\t%+v", - badPod.ObjectMeta.Name, badPod.Spec.NodeName, badPod.Status.Phase, grace, badPod.Status.Conditions) - fmt.Fprintln(w, podInfo) + if err != nil && !IsTimeout(err) { + return fmt.Errorf("%s\nLast error: %w", errStr, err) } - w.Flush() - return errStr + buf.String() + return TimeoutError(errStr) } // WaitForPodsRunningReady waits up to timeout to ensure that all pods in @@ -129,10 +170,9 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN lastAPIError = nil rcList, err := c.CoreV1().ReplicationControllers(ns).List(context.TODO(), metav1.ListOptions{}) + lastAPIError = err if err != nil { - e2elog.Logf("Error getting replication controllers in namespace '%s': %v", ns, err) - lastAPIError = err - return false, err + return handleWaitingAPIError(err, false, "listing replication controllers in namespace %s", ns) } for _, rc := range rcList.Items { replicas += *rc.Spec.Replicas @@ -140,10 +180,9 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN } rsList, err := c.AppsV1().ReplicaSets(ns).List(context.TODO(), metav1.ListOptions{}) + lastAPIError = err if err != nil { - lastAPIError = err - e2elog.Logf("Error getting replication sets in namespace %q: %v", ns, err) - return false, err + return handleWaitingAPIError(err, false, "listing replication sets in namespace %s", ns) } for _, rs := range rsList.Items { replicas += *rs.Spec.Replicas @@ -151,10 +190,9 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN } podList, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{}) + lastAPIError = err if err != nil { - lastAPIError = err - e2elog.Logf("Error getting pods in namespace '%s': %v", ns, err) - return false, err + return handleWaitingAPIError(err, false, "listing pods in namespace %s", ns) } nOk := int32(0) notReady = int32(0) @@ -197,7 +235,7 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN return false, nil }) != nil { if !ignoreNotReady { - return errors.New(errorBadPodsStates(badPods, desiredPods, ns, "RUNNING and READY", timeout, lastAPIError)) + return errorBadPodsStates(badPods, desiredPods, ns, "RUNNING and READY", timeout, lastAPIError) } e2elog.Logf("Number of not-ready pods (%d) is below the allowed threshold (%d).", notReady, allowedNotReadyPods) } @@ -205,35 +243,83 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN } // WaitForPodCondition waits a pods to be matched to the given condition. -func WaitForPodCondition(c clientset.Interface, ns, podName, desc string, timeout time.Duration, condition podCondition) error { - e2elog.Logf("Waiting up to %v for pod %q in namespace %q to be %q", timeout, podName, ns, desc) - var lastPodError error - for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) { +func WaitForPodCondition(c clientset.Interface, ns, podName, conditionDesc string, timeout time.Duration, condition podCondition) error { + e2elog.Logf("Waiting up to %v for pod %q in namespace %q to be %q", timeout, podName, ns, conditionDesc) + var ( + lastPodError error + lastPod *v1.Pod + start = time.Now() + ) + err := wait.PollImmediate(poll, timeout, func() (bool, error) { pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) lastPodError = err if err != nil { - if apierrors.IsNotFound(err) { - e2elog.Logf("Pod %q in namespace %q not found. Error: %v", podName, ns, err) - } else { - e2elog.Logf("Get pod %q in namespace %q failed, ignoring for %v. Error: %v", podName, ns, poll, err) - } - continue + return handleWaitingAPIError(err, true, "getting pod %s", podIdentifier(ns, podName)) } + lastPod = pod // Don't overwrite if an error occurs after successfully retrieving. + // log now so that current pod info is reported before calling `condition()` e2elog.Logf("Pod %q: Phase=%q, Reason=%q, readiness=%t. Elapsed: %v", podName, pod.Status.Phase, pod.Status.Reason, podutils.IsPodReady(pod), time.Since(start)) if done, err := condition(pod); done { if err == nil { - e2elog.Logf("Pod %q satisfied condition %q", podName, desc) + e2elog.Logf("Pod %q satisfied condition %q", podName, conditionDesc) } - return err + return true, err + } else if err != nil { + // TODO(#109732): stop polling and return the error in this case. + e2elog.Logf("Error evaluating pod condition %s: %v", conditionDesc, err) } + return false, nil + }) + if err == nil { + return nil } - if apierrors.IsNotFound(lastPodError) { - // return for compatbility with other functions testing for IsNotFound - return lastPodError + if IsTimeout(err) { + e2elog.Logf("Timed out while waiting for pod %s to be %s. Last observed as: %s", + podIdentifier(ns, podName), conditionDesc, format.Object(lastPod, 1)) } - return fmt.Errorf("Gave up after waiting %v for pod %q to be %q", timeout, podName, desc) + if lastPodError != nil { + // If the last API call was an error. + err = lastPodError + } + return maybeTimeoutError(err, "waiting for pod %s to be %s", podIdentifier(ns, podName), conditionDesc) +} + +// WaitForPodsCondition waits for the listed pods to match the given condition. +// To succeed, at least minPods must be listed, and all listed pods must match the condition. +func WaitForAllPodsCondition(c clientset.Interface, ns string, opts metav1.ListOptions, minPods int, conditionDesc string, timeout time.Duration, condition podCondition) (*v1.PodList, error) { + e2elog.Logf("Waiting up to %v for at least %d pods in namespace %s to be %s", timeout, minPods, ns, conditionDesc) + var pods *v1.PodList + matched := 0 + err := wait.PollImmediate(poll, timeout, func() (done bool, err error) { + pods, err = c.CoreV1().Pods(ns).List(context.TODO(), opts) + if err != nil { + return handleWaitingAPIError(err, true, "listing pods") + } + if len(pods.Items) < minPods { + e2elog.Logf("found %d pods, waiting for at least %d", len(pods.Items), minPods) + return false, nil + } + + nonMatchingPods := []string{} + for _, pod := range pods.Items { + done, err := condition(&pod) + if done && err != nil { + return false, fmt.Errorf("error evaluating pod %s: %w", identifier(&pod), err) + } + if !done { + nonMatchingPods = append(nonMatchingPods, identifier(&pod)) + } + } + matched = len(pods.Items) - len(nonMatchingPods) + if len(nonMatchingPods) <= 0 { + return true, nil // All pods match. + } + e2elog.Logf("%d pods are not %s: %v", len(nonMatchingPods), conditionDesc, nonMatchingPods) + return false, nil + }) + return pods, maybeTimeoutError(err, "waiting for at least %d pods to be %s (matched %d)", minPods, conditionDesc, matched) } // WaitForPodTerminatedInNamespace returns an error if it takes too long for the pod to terminate, @@ -242,7 +328,7 @@ func WaitForPodCondition(c clientset.Interface, ns, podName, desc string, timeou // terminated (reason==""), but may be called to detect if a pod did *not* terminate according to // the supplied reason. func WaitForPodTerminatedInNamespace(c clientset.Interface, podName, reason, namespace string) error { - return WaitForPodCondition(c, namespace, podName, "terminated due to deadline exceeded", podStartTimeout, func(pod *v1.Pod) (bool, error) { + return WaitForPodCondition(c, namespace, podName, fmt.Sprintf("terminated with reason %s", reason), podStartTimeout, func(pod *v1.Pod) (bool, error) { // Only consider Failed pods. Successful pods will be deleted and detected in // waitForPodCondition's Get call returning `IsNotFound` if pod.Status.Phase == v1.PodFailed { @@ -295,33 +381,6 @@ func WaitForPodNameUnschedulableInNamespace(c clientset.Interface, podName, name }) } -// WaitForMatchPodsCondition finds match pods based on the input ListOptions. -// waits and checks if all match pods are in the given podCondition -func WaitForMatchPodsCondition(c clientset.Interface, opts metav1.ListOptions, desc string, timeout time.Duration, condition podCondition) error { - e2elog.Logf("Waiting up to %v for matching pods' status to be %s", timeout, desc) - for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) { - pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), opts) - if err != nil { - return err - } - conditionNotMatch := []string{} - for _, pod := range pods.Items { - done, err := condition(&pod) - if done && err != nil { - return fmt.Errorf("Unexpected error: %v", err) - } - if !done { - conditionNotMatch = append(conditionNotMatch, fmt.Sprintf("%s_%s(%s)", pod.Name, pod.Namespace, pod.UID)) - } - } - if len(conditionNotMatch) <= 0 { - return err - } - e2elog.Logf("%d pods are not %s: %v", len(conditionNotMatch), desc, conditionNotMatch) - } - return fmt.Errorf("gave up waiting for matching pods to be '%s' after %v", desc, timeout) -} - // WaitForPodNameRunningInNamespace waits default amount of time (PodStartTimeout) for the specified pod to become running. // Returns an error if timeout occurs first, or pod goes in to failed state. func WaitForPodNameRunningInNamespace(c clientset.Interface, podName, namespace string) error { @@ -337,7 +396,15 @@ func WaitForPodRunningInNamespaceSlow(c clientset.Interface, podName, namespace // WaitTimeoutForPodRunningInNamespace waits the given timeout duration for the specified pod to become running. func WaitTimeoutForPodRunningInNamespace(c clientset.Interface, podName, namespace string, timeout time.Duration) error { - return wait.PollImmediate(poll, timeout, podRunning(c, podName, namespace)) + return WaitForPodCondition(c, namespace, podName, "running", timeout, func(pod *v1.Pod) (bool, error) { + switch pod.Status.Phase { + case v1.PodRunning: + return true, nil + case v1.PodFailed, v1.PodSucceeded: + return false, errPodCompleted + } + return false, nil + }) } // WaitForPodRunningInNamespace waits default amount of time (podStartTimeout) for the specified pod to become running. @@ -351,7 +418,13 @@ func WaitForPodRunningInNamespace(c clientset.Interface, pod *v1.Pod) error { // WaitTimeoutForPodNoLongerRunningInNamespace waits the given timeout duration for the specified pod to stop. func WaitTimeoutForPodNoLongerRunningInNamespace(c clientset.Interface, podName, namespace string, timeout time.Duration) error { - return wait.PollImmediate(poll, timeout, podCompleted(c, podName, namespace)) + return WaitForPodCondition(c, namespace, podName, "completed", timeout, func(pod *v1.Pod) (bool, error) { + switch pod.Status.Phase { + case v1.PodFailed, v1.PodSucceeded: + return true, nil + } + return false, nil + }) } // WaitForPodNoLongerRunningInNamespace waits default amount of time (defaultPodDeletionTimeout) for the specified pod to stop running. @@ -363,14 +436,32 @@ func WaitForPodNoLongerRunningInNamespace(c clientset.Interface, podName, namesp // WaitTimeoutForPodReadyInNamespace waits the given timeout duration for the // specified pod to be ready and running. func WaitTimeoutForPodReadyInNamespace(c clientset.Interface, podName, namespace string, timeout time.Duration) error { - return wait.PollImmediate(poll, timeout, podRunningAndReady(c, podName, namespace)) + return WaitForPodCondition(c, namespace, podName, "running and ready", timeout, func(pod *v1.Pod) (bool, error) { + switch pod.Status.Phase { + case v1.PodFailed, v1.PodSucceeded: + e2elog.Logf("The phase of Pod %s is %s which is unexpected, pod status: %#v", pod.Name, pod.Status.Phase, pod.Status) + return false, errPodCompleted + case v1.PodRunning: + e2elog.Logf("The phase of Pod %s is %s (Ready = %v)", pod.Name, pod.Status.Phase, podutils.IsPodReady(pod)) + return podutils.IsPodReady(pod), nil + } + e2elog.Logf("The phase of Pod %s is %s, waiting for it to be Running (with Ready = true)", pod.Name, pod.Status.Phase) + return false, nil + }) } // WaitForPodNotPending returns an error if it took too long for the pod to go out of pending state. // The resourceVersion is used when Watching object changes, it tells since when we care // about changes to the pod. func WaitForPodNotPending(c clientset.Interface, ns, podName string) error { - return wait.PollImmediate(poll, podStartTimeout, podNotPending(c, podName, ns)) + return WaitForPodCondition(c, ns, podName, "not pending", podStartTimeout, func(pod *v1.Pod) (bool, error) { + switch pod.Status.Phase { + case v1.PodPending: + return false, nil + default: + return true, nil + } + }) } // WaitForPodSuccessInNamespace returns nil if the pod reached state success, or an error if it reached failure or until podStartupTimeout. @@ -388,32 +479,44 @@ func WaitForPodSuccessInNamespaceSlow(c clientset.Interface, podName string, nam // api returns IsNotFound then the wait stops and nil is returned. If the Get api returns an error other // than "not found" then that error is returned and the wait stops. func WaitForPodNotFoundInNamespace(c clientset.Interface, podName, ns string, timeout time.Duration) error { - return wait.PollImmediate(poll, timeout, func() (bool, error) { - _, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + var lastPod *v1.Pod + err := wait.PollImmediate(poll, timeout, func() (bool, error) { + pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) if apierrors.IsNotFound(err) { return true, nil // done } if err != nil { - return true, err // stop wait with error + return handleWaitingAPIError(err, true, "getting pod %s", podIdentifier(ns, podName)) } + lastPod = pod return false, nil }) + if err == nil { + return nil + } + if IsTimeout(err) { + e2elog.Logf("Timed out while waiting for pod %s to be Not Found. Last observed as: %s", + podIdentifier(ns, podName), format.Object(lastPod, 1)) + } + return maybeTimeoutError(err, "waiting for pod %s not found", podIdentifier(ns, podName)) } // WaitForPodToDisappear waits the given timeout duration for the specified pod to disappear. func WaitForPodToDisappear(c clientset.Interface, ns, podName string, label labels.Selector, interval, timeout time.Duration) error { - return wait.PollImmediate(interval, timeout, func() (bool, error) { + var lastPod *v1.Pod + err := wait.PollImmediate(interval, timeout, func() (bool, error) { e2elog.Logf("Waiting for pod %s to disappear", podName) options := metav1.ListOptions{LabelSelector: label.String()} pods, err := c.CoreV1().Pods(ns).List(context.TODO(), options) if err != nil { - return false, err + return handleWaitingAPIError(err, true, "listing pods") } found := false - for _, pod := range pods.Items { + for i, pod := range pods.Items { if pod.Name == podName { e2elog.Logf("Pod %s still exists", podName) found = true + lastPod = &(pods.Items[i]) break } } @@ -423,91 +526,65 @@ func WaitForPodToDisappear(c clientset.Interface, ns, podName string, label labe } return false, nil }) + if err == nil { + return nil + } + if IsTimeout(err) { + e2elog.Logf("Timed out while waiting for pod %s to disappear. Last observed as: %s", + podIdentifier(ns, podName), format.Object(lastPod, 1)) + } + return maybeTimeoutError(err, "waiting for pod %s to disappear", podIdentifier(ns, podName)) } // PodsResponding waits for the pods to response. func PodsResponding(c clientset.Interface, ns, name string, wantName bool, pods *v1.PodList) error { ginkgo.By("trying to dial each unique pod") label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) - return wait.PollImmediate(poll, podRespondingTimeout, NewProxyResponseChecker(c, ns, label, name, wantName, pods).CheckAllResponses) + err := wait.PollImmediate(poll, podRespondingTimeout, NewProxyResponseChecker(c, ns, label, name, wantName, pods).CheckAllResponses) + return maybeTimeoutError(err, "waiting for pods to be responsive") } // WaitForNumberOfPods waits up to timeout to ensure there are exact // `num` pods in namespace `ns`. // It returns the matching Pods or a timeout error. func WaitForNumberOfPods(c clientset.Interface, ns string, num int, timeout time.Duration) (pods *v1.PodList, err error) { + actualNum := 0 err = wait.PollImmediate(poll, timeout, func() (bool, error) { pods, err = c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{}) - // ignore intermittent network error if err != nil { - return false, nil + return handleWaitingAPIError(err, false, "listing pods") } - return len(pods.Items) == num, nil + actualNum = len(pods.Items) + return actualNum == num, nil }) - return + return pods, maybeTimeoutError(err, "waiting for there to be exactly %d pods in namespace (last seen %d)", num, actualNum) } // WaitForPodsWithLabelScheduled waits for all matching pods to become scheduled and at least one // matching pod exists. Return the list of matching pods. func WaitForPodsWithLabelScheduled(c clientset.Interface, ns string, label labels.Selector) (pods *v1.PodList, err error) { - err = wait.PollImmediate(poll, podScheduledBeforeTimeout, - func() (bool, error) { - pods, err = WaitForPodsWithLabel(c, ns, label) - if err != nil { - return false, err - } - for _, pod := range pods.Items { - if pod.Spec.NodeName == "" { - return false, nil - } - } - return true, nil - }) - return pods, err + opts := metav1.ListOptions{LabelSelector: label.String()} + return WaitForAllPodsCondition(c, ns, opts, 1, "scheduled", podScheduledBeforeTimeout, func(pod *v1.Pod) (bool, error) { + if pod.Spec.NodeName == "" { + return false, nil + } + return true, nil + }) } // WaitForPodsWithLabel waits up to podListTimeout for getting pods with certain label -func WaitForPodsWithLabel(c clientset.Interface, ns string, label labels.Selector) (pods *v1.PodList, err error) { - for t := time.Now(); time.Since(t) < podListTimeout; time.Sleep(poll) { - options := metav1.ListOptions{LabelSelector: label.String()} - pods, err = c.CoreV1().Pods(ns).List(context.TODO(), options) - if err != nil { - return - } - if len(pods.Items) > 0 { - break - } - } - if pods == nil || len(pods.Items) == 0 { - err = fmt.Errorf("Timeout while waiting for pods with label %v", label) - } - return +func WaitForPodsWithLabel(c clientset.Interface, ns string, label labels.Selector) (*v1.PodList, error) { + opts := metav1.ListOptions{LabelSelector: label.String()} + return WaitForAllPodsCondition(c, ns, opts, 1, "existent", podListTimeout, func(pod *v1.Pod) (bool, error) { + return true, nil + }) } // WaitForPodsWithLabelRunningReady waits for exact amount of matching pods to become running and ready. // Return the list of matching pods. func WaitForPodsWithLabelRunningReady(c clientset.Interface, ns string, label labels.Selector, num int, timeout time.Duration) (pods *v1.PodList, err error) { - var current int - err = wait.Poll(poll, timeout, - func() (bool, error) { - pods, err = WaitForPodsWithLabel(c, ns, label) - if err != nil { - e2elog.Logf("Failed to list pods: %v", err) - return false, err - } - current = 0 - for _, pod := range pods.Items { - if flag, err := testutils.PodRunningReady(&pod); err == nil && flag == true { - current++ - } - } - if current != num { - e2elog.Logf("Got %v pods running and ready, expect: %v", current, num) - return false, nil - } - return true, nil - }) - return pods, err + opts := metav1.ListOptions{LabelSelector: label.String()} + return WaitForAllPodsCondition(c, ns, opts, 1, "running and ready", podListTimeout, testutils.PodRunningReady) } // WaitForNRestartablePods tries to list restarting pods using ps until it finds expect of them, @@ -540,33 +617,101 @@ func WaitForNRestartablePods(ps *testutils.PodStore, expect int, timeout time.Du // invalid container configuration. In this case, the container will remain in a waiting state with a specific // reason set, which should match the given reason. func WaitForPodContainerToFail(c clientset.Interface, namespace, podName string, containerIndex int, reason string, timeout time.Duration) error { - return wait.PollImmediate(poll, timeout, podContainerFailed(c, namespace, podName, containerIndex, reason)) + conditionDesc := fmt.Sprintf("container %d failed with reason %s", containerIndex, reason) + return WaitForPodCondition(c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) { + switch pod.Status.Phase { + case v1.PodPending: + if len(pod.Status.ContainerStatuses) == 0 { + return false, nil + } + containerStatus := pod.Status.ContainerStatuses[containerIndex] + if containerStatus.State.Waiting != nil && containerStatus.State.Waiting.Reason == reason { + return true, nil + } + return false, nil + case v1.PodFailed, v1.PodRunning, v1.PodSucceeded: + return false, fmt.Errorf("pod was expected to be pending, but it is in the state: %s", pod.Status.Phase) + } + return false, nil + }) } // WaitForPodContainerStarted waits for the given Pod container to start, after a successful run of the startupProbe. func WaitForPodContainerStarted(c clientset.Interface, namespace, podName string, containerIndex int, timeout time.Duration) error { - return wait.PollImmediate(poll, timeout, podContainerStarted(c, namespace, podName, containerIndex)) + conditionDesc := fmt.Sprintf("container %d started", containerIndex) + return WaitForPodCondition(c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) { + if containerIndex > len(pod.Status.ContainerStatuses)-1 { + return false, nil + } + containerStatus := pod.Status.ContainerStatuses[containerIndex] + return *containerStatus.Started, nil + }) } // WaitForPodFailedReason wait for pod failed reason in status, for example "SysctlForbidden". func WaitForPodFailedReason(c clientset.Interface, pod *v1.Pod, reason string, timeout time.Duration) error { - waitErr := wait.PollImmediate(poll, timeout, func() (bool, error) { - pod, err := c.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - if pod.Status.Reason == reason { - return true, nil + conditionDesc := fmt.Sprintf("failed with reason %s", reason) + return WaitForPodCondition(c, pod.Namespace, pod.Name, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) { + switch pod.Status.Phase { + case v1.PodSucceeded: + return true, errors.New("pod succeeded unexpectedly") + case v1.PodFailed: + if pod.Status.Reason == reason { + return true, nil + } else { + return true, fmt.Errorf("pod failed with reason %s", pod.Status.Reason) + } } return false, nil }) - if waitErr != nil { - return fmt.Errorf("error waiting for pod failure status: %v", waitErr) - } - return nil } // WaitForContainerRunning waits for the given Pod container to have a state of running func WaitForContainerRunning(c clientset.Interface, namespace, podName, containerName string, timeout time.Duration) error { - return wait.PollImmediate(poll, timeout, isContainerRunning(c, namespace, podName, containerName)) + conditionDesc := fmt.Sprintf("container %s running", containerName) + return WaitForPodCondition(c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) { + for _, statuses := range [][]v1.ContainerStatus{pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses, pod.Status.EphemeralContainerStatuses} { + for _, cs := range statuses { + if cs.Name == containerName { + return cs.State.Running != nil, nil + } + } + } + return false, nil + }) +} + +// handleWaitingAPIErrror handles an error from an API request in the context of a Wait function. +// If the error is retryable, sleep the recommended delay and ignore the error. +// If the erorr is terminal, return it. +func handleWaitingAPIError(err error, retryNotFound bool, taskFormat string, taskArgs ...interface{}) (bool, error) { + taskDescription := fmt.Sprintf(taskFormat, taskArgs...) + if retryNotFound && apierrors.IsNotFound(err) { + e2elog.Logf("Ignoring NotFound error while " + taskDescription) + return false, nil + } + if retry, delay := shouldRetry(err); retry { + e2elog.Logf("Retryable error while %s, retrying after %v: %v", taskDescription, delay, err) + if delay > 0 { + time.Sleep(delay) + } + return false, nil + } + e2elog.Logf("Encountered non-retryable error while %s: %v", taskDescription, err) + return false, err +} + +// Decide whether to retry an API request. Optionally include a delay to retry after. +func shouldRetry(err error) (retry bool, retryAfter time.Duration) { + // if the error sends the Retry-After header, we respect it as an explicit confirmation we should retry. + if delay, shouldRetry := apierrors.SuggestsClientDelay(err); shouldRetry { + return shouldRetry, time.Duration(delay) * time.Second + } + + // these errors indicate a transient error that should be retried. + if apierrors.IsInternalError(err) || apierrors.IsTimeout(err) || apierrors.IsTooManyRequests(err) { + return true, 0 + } + + return false, 0 }