[E2E] Refactor pod polling functions (WaitForX) (#109704)

* Clean up WaitFor Pod functions

* Handle retryable errors when polling

* Log more context on timeout

* #squash Address PR feedback
This commit is contained in:
Tim Allclair 2022-05-05 01:42:32 -07:00 committed by GitHub
parent a685faa798
commit 07c34eb400
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 310 additions and 279 deletions

View File

@ -31,10 +31,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/util/podutils"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
testutils "k8s.io/kubernetes/test/utils"
@ -155,70 +153,6 @@ func (r ProxyResponseChecker) CheckAllResponses() (done bool, err error) {
return true, nil
}
func podRunning(c clientset.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}
switch pod.Status.Phase {
case v1.PodRunning:
return true, nil
case v1.PodFailed, v1.PodSucceeded:
return false, errPodCompleted
}
return false, nil
}
}
func podCompleted(c clientset.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}
switch pod.Status.Phase {
case v1.PodFailed, v1.PodSucceeded:
return true, nil
}
return false, nil
}
}
func podRunningAndReady(c clientset.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}
switch pod.Status.Phase {
case v1.PodFailed, v1.PodSucceeded:
e2elog.Logf("The status of Pod %s is %s which is unexpected, pod status: %#v", podName, pod.Status.Phase, pod.Status)
return false, errPodCompleted
case v1.PodRunning:
e2elog.Logf("The status of Pod %s is %s (Ready = %v)", podName, pod.Status.Phase, podutils.IsPodReady(pod))
return podutils.IsPodReady(pod), nil
}
e2elog.Logf("The status of Pod %s is %s, waiting for it to be Running (with Ready = true)", podName, pod.Status.Phase)
return false, nil
}
}
func podNotPending(c clientset.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}
switch pod.Status.Phase {
case v1.PodPending:
return false, nil
default:
return true, nil
}
}
}
// PodsCreated returns a pod list matched by the given name.
func PodsCreated(c clientset.Interface, ns, name string, replicas int32) (*v1.PodList, error) {
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
@ -305,60 +239,6 @@ func podsRunning(c clientset.Interface, pods *v1.PodList) []error {
return e
}
func podContainerFailed(c clientset.Interface, namespace, podName string, containerIndex int, reason string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}
switch pod.Status.Phase {
case v1.PodPending:
if len(pod.Status.ContainerStatuses) == 0 {
return false, nil
}
containerStatus := pod.Status.ContainerStatuses[containerIndex]
if containerStatus.State.Waiting != nil && containerStatus.State.Waiting.Reason == reason {
return true, nil
}
return false, nil
case v1.PodFailed, v1.PodRunning, v1.PodSucceeded:
return false, fmt.Errorf("pod was expected to be pending, but it is in the state: %s", pod.Status.Phase)
}
return false, nil
}
}
func podContainerStarted(c clientset.Interface, namespace, podName string, containerIndex int) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}
if containerIndex > len(pod.Status.ContainerStatuses)-1 {
return false, nil
}
containerStatus := pod.Status.ContainerStatuses[containerIndex]
return *containerStatus.Started, nil
}
}
func isContainerRunning(c clientset.Interface, namespace, podName, containerName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}
for _, statuses := range [][]v1.ContainerStatus{pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses, pod.Status.EphemeralContainerStatuses} {
for _, cs := range statuses {
if cs.Name == containerName {
return cs.State.Running != nil, nil
}
}
}
return false, nil
}
}
// LogPodStates logs basic info of provided pods for debugging.
func LogPodStates(pods []v1.Pod) {
// Find maximum widths for pod, node, and phase strings for column printing.
@ -565,13 +445,7 @@ func CreateExecPodOrFail(client clientset.Interface, ns, generateName string, tw
}
execPod, err := client.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
expectNoError(err, "failed to create new exec pod in namespace: %s", ns)
err = wait.PollImmediate(poll, 5*time.Minute, func() (bool, error) {
retrievedPod, err := client.CoreV1().Pods(execPod.Namespace).Get(context.TODO(), execPod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
return retrievedPod.Status.Phase == v1.PodRunning, nil
})
err = WaitForPodNameRunningInNamespace(client, execPod.Name, execPod.Namespace)
expectNoError(err, "failed to create new exec pod in namespace: %s", ns)
return execPod
}
@ -769,3 +643,15 @@ func IsPodActive(p *v1.Pod) bool {
v1.PodFailed != p.Status.Phase &&
p.DeletionTimestamp == nil
}
func podIdentifier(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}
func identifier(pod *v1.Pod) string {
id := podIdentifier(pod.Namespace, pod.Name)
if pod.UID != "" {
id += fmt.Sprintf("(%s)", pod.UID)
}
return id
}

View File

@ -25,6 +25,7 @@ import (
"time"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega/format"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -65,17 +66,51 @@ const (
type podCondition func(pod *v1.Pod) (bool, error)
// errorBadPodsStates create error message of basic info of bad pods for debugging.
func errorBadPodsStates(badPods []v1.Pod, desiredPods int, ns, desiredState string, timeout time.Duration, err error) string {
errStr := fmt.Sprintf("%d / %d pods in namespace %q are NOT in %s state in %v\n", len(badPods), desiredPods, ns, desiredState, timeout)
if err != nil {
errStr += fmt.Sprintf("Last error: %s\n", err)
type timeoutError struct {
msg string
}
func (e *timeoutError) Error() string {
return e.msg
}
func TimeoutError(format string, args ...interface{}) *timeoutError {
return &timeoutError{
msg: fmt.Sprintf(format, args...),
}
}
// maybeTimeoutError returns a TimeoutError if err is a timeout. Otherwise, wrap err.
// taskFormat and taskArgs should be the task being performed when the error occurred,
// e.g. "waiting for pod to be running".
func maybeTimeoutError(err error, taskFormat string, taskArgs ...interface{}) error {
if IsTimeout(err) {
return TimeoutError("timed out while "+taskFormat, taskArgs...)
} else if err != nil {
return fmt.Errorf("error while %s: %w", fmt.Sprintf(taskFormat, taskArgs...), err)
} else {
return nil
}
}
func IsTimeout(err error) bool {
if err == wait.ErrWaitTimeout {
return true
}
if _, ok := err.(*timeoutError); ok {
return true
}
return false
}
// errorBadPodsStates create error message of basic info of bad pods for debugging.
func errorBadPodsStates(badPods []v1.Pod, desiredPods int, ns, desiredState string, timeout time.Duration, err error) error {
errStr := fmt.Sprintf("%d / %d pods in namespace %s are NOT in %s state in %v\n", len(badPods), desiredPods, ns, desiredState, timeout)
// Print bad pods info only if there are fewer than 10 bad pods
if len(badPods) > 10 {
return errStr + "There are too many bad pods. Please check log for details."
}
errStr += "There are too many bad pods. Please check log for details."
} else {
buf := bytes.NewBuffer(nil)
w := tabwriter.NewWriter(buf, 0, 0, 1, ' ', 0)
fmt.Fprintln(w, "POD\tNODE\tPHASE\tGRACE\tCONDITIONS")
@ -89,7 +124,13 @@ func errorBadPodsStates(badPods []v1.Pod, desiredPods int, ns, desiredState stri
fmt.Fprintln(w, podInfo)
}
w.Flush()
return errStr + buf.String()
errStr += buf.String()
}
if err != nil && !IsTimeout(err) {
return fmt.Errorf("%s\nLast error: %w", errStr, err)
}
return TimeoutError(errStr)
}
// WaitForPodsRunningReady waits up to timeout to ensure that all pods in
@ -129,10 +170,9 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN
lastAPIError = nil
rcList, err := c.CoreV1().ReplicationControllers(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
e2elog.Logf("Error getting replication controllers in namespace '%s': %v", ns, err)
lastAPIError = err
return false, err
if err != nil {
return handleWaitingAPIError(err, false, "listing replication controllers in namespace %s", ns)
}
for _, rc := range rcList.Items {
replicas += *rc.Spec.Replicas
@ -140,10 +180,9 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN
}
rsList, err := c.AppsV1().ReplicaSets(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
lastAPIError = err
e2elog.Logf("Error getting replication sets in namespace %q: %v", ns, err)
return false, err
if err != nil {
return handleWaitingAPIError(err, false, "listing replication sets in namespace %s", ns)
}
for _, rs := range rsList.Items {
replicas += *rs.Spec.Replicas
@ -151,10 +190,9 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN
}
podList, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
lastAPIError = err
e2elog.Logf("Error getting pods in namespace '%s': %v", ns, err)
return false, err
if err != nil {
return handleWaitingAPIError(err, false, "listing pods in namespace %s", ns)
}
nOk := int32(0)
notReady = int32(0)
@ -197,7 +235,7 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN
return false, nil
}) != nil {
if !ignoreNotReady {
return errors.New(errorBadPodsStates(badPods, desiredPods, ns, "RUNNING and READY", timeout, lastAPIError))
return errorBadPodsStates(badPods, desiredPods, ns, "RUNNING and READY", timeout, lastAPIError)
}
e2elog.Logf("Number of not-ready pods (%d) is below the allowed threshold (%d).", notReady, allowedNotReadyPods)
}
@ -205,35 +243,83 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN
}
// WaitForPodCondition waits a pods to be matched to the given condition.
func WaitForPodCondition(c clientset.Interface, ns, podName, desc string, timeout time.Duration, condition podCondition) error {
e2elog.Logf("Waiting up to %v for pod %q in namespace %q to be %q", timeout, podName, ns, desc)
var lastPodError error
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
func WaitForPodCondition(c clientset.Interface, ns, podName, conditionDesc string, timeout time.Duration, condition podCondition) error {
e2elog.Logf("Waiting up to %v for pod %q in namespace %q to be %q", timeout, podName, ns, conditionDesc)
var (
lastPodError error
lastPod *v1.Pod
start = time.Now()
)
err := wait.PollImmediate(poll, timeout, func() (bool, error) {
pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
lastPodError = err
if err != nil {
if apierrors.IsNotFound(err) {
e2elog.Logf("Pod %q in namespace %q not found. Error: %v", podName, ns, err)
} else {
e2elog.Logf("Get pod %q in namespace %q failed, ignoring for %v. Error: %v", podName, ns, poll, err)
}
continue
return handleWaitingAPIError(err, true, "getting pod %s", podIdentifier(ns, podName))
}
lastPod = pod // Don't overwrite if an error occurs after successfully retrieving.
// log now so that current pod info is reported before calling `condition()`
e2elog.Logf("Pod %q: Phase=%q, Reason=%q, readiness=%t. Elapsed: %v",
podName, pod.Status.Phase, pod.Status.Reason, podutils.IsPodReady(pod), time.Since(start))
if done, err := condition(pod); done {
if err == nil {
e2elog.Logf("Pod %q satisfied condition %q", podName, desc)
e2elog.Logf("Pod %q satisfied condition %q", podName, conditionDesc)
}
return err
return true, err
} else if err != nil {
// TODO(#109732): stop polling and return the error in this case.
e2elog.Logf("Error evaluating pod condition %s: %v", conditionDesc, err)
}
return false, nil
})
if err == nil {
return nil
}
if IsTimeout(err) {
e2elog.Logf("Timed out while waiting for pod %s to be %s. Last observed as: %s",
podIdentifier(ns, podName), conditionDesc, format.Object(lastPod, 1))
}
if lastPodError != nil {
// If the last API call was an error.
err = lastPodError
}
return maybeTimeoutError(err, "waiting for pod %s to be %s", podIdentifier(ns, podName), conditionDesc)
}
// WaitForPodsCondition waits for the listed pods to match the given condition.
// To succeed, at least minPods must be listed, and all listed pods must match the condition.
func WaitForAllPodsCondition(c clientset.Interface, ns string, opts metav1.ListOptions, minPods int, conditionDesc string, timeout time.Duration, condition podCondition) (*v1.PodList, error) {
e2elog.Logf("Waiting up to %v for at least %d pods in namespace %s to be %s", timeout, minPods, ns, conditionDesc)
var pods *v1.PodList
matched := 0
err := wait.PollImmediate(poll, timeout, func() (done bool, err error) {
pods, err = c.CoreV1().Pods(ns).List(context.TODO(), opts)
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
}
if len(pods.Items) < minPods {
e2elog.Logf("found %d pods, waiting for at least %d", len(pods.Items), minPods)
return false, nil
}
nonMatchingPods := []string{}
for _, pod := range pods.Items {
done, err := condition(&pod)
if done && err != nil {
return false, fmt.Errorf("error evaluating pod %s: %w", identifier(&pod), err)
}
if !done {
nonMatchingPods = append(nonMatchingPods, identifier(&pod))
}
}
if apierrors.IsNotFound(lastPodError) {
// return for compatbility with other functions testing for IsNotFound
return lastPodError
matched = len(pods.Items) - len(nonMatchingPods)
if len(nonMatchingPods) <= 0 {
return true, nil // All pods match.
}
return fmt.Errorf("Gave up after waiting %v for pod %q to be %q", timeout, podName, desc)
e2elog.Logf("%d pods are not %s: %v", len(nonMatchingPods), conditionDesc, nonMatchingPods)
return false, nil
})
return pods, maybeTimeoutError(err, "waiting for at least %d pods to be %s (matched %d)", minPods, conditionDesc, matched)
}
// WaitForPodTerminatedInNamespace returns an error if it takes too long for the pod to terminate,
@ -242,7 +328,7 @@ func WaitForPodCondition(c clientset.Interface, ns, podName, desc string, timeou
// terminated (reason==""), but may be called to detect if a pod did *not* terminate according to
// the supplied reason.
func WaitForPodTerminatedInNamespace(c clientset.Interface, podName, reason, namespace string) error {
return WaitForPodCondition(c, namespace, podName, "terminated due to deadline exceeded", podStartTimeout, func(pod *v1.Pod) (bool, error) {
return WaitForPodCondition(c, namespace, podName, fmt.Sprintf("terminated with reason %s", reason), podStartTimeout, func(pod *v1.Pod) (bool, error) {
// Only consider Failed pods. Successful pods will be deleted and detected in
// waitForPodCondition's Get call returning `IsNotFound`
if pod.Status.Phase == v1.PodFailed {
@ -295,33 +381,6 @@ func WaitForPodNameUnschedulableInNamespace(c clientset.Interface, podName, name
})
}
// WaitForMatchPodsCondition finds match pods based on the input ListOptions.
// waits and checks if all match pods are in the given podCondition
func WaitForMatchPodsCondition(c clientset.Interface, opts metav1.ListOptions, desc string, timeout time.Duration, condition podCondition) error {
e2elog.Logf("Waiting up to %v for matching pods' status to be %s", timeout, desc)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), opts)
if err != nil {
return err
}
conditionNotMatch := []string{}
for _, pod := range pods.Items {
done, err := condition(&pod)
if done && err != nil {
return fmt.Errorf("Unexpected error: %v", err)
}
if !done {
conditionNotMatch = append(conditionNotMatch, fmt.Sprintf("%s_%s(%s)", pod.Name, pod.Namespace, pod.UID))
}
}
if len(conditionNotMatch) <= 0 {
return err
}
e2elog.Logf("%d pods are not %s: %v", len(conditionNotMatch), desc, conditionNotMatch)
}
return fmt.Errorf("gave up waiting for matching pods to be '%s' after %v", desc, timeout)
}
// WaitForPodNameRunningInNamespace waits default amount of time (PodStartTimeout) for the specified pod to become running.
// Returns an error if timeout occurs first, or pod goes in to failed state.
func WaitForPodNameRunningInNamespace(c clientset.Interface, podName, namespace string) error {
@ -337,7 +396,15 @@ func WaitForPodRunningInNamespaceSlow(c clientset.Interface, podName, namespace
// WaitTimeoutForPodRunningInNamespace waits the given timeout duration for the specified pod to become running.
func WaitTimeoutForPodRunningInNamespace(c clientset.Interface, podName, namespace string, timeout time.Duration) error {
return wait.PollImmediate(poll, timeout, podRunning(c, podName, namespace))
return WaitForPodCondition(c, namespace, podName, "running", timeout, func(pod *v1.Pod) (bool, error) {
switch pod.Status.Phase {
case v1.PodRunning:
return true, nil
case v1.PodFailed, v1.PodSucceeded:
return false, errPodCompleted
}
return false, nil
})
}
// WaitForPodRunningInNamespace waits default amount of time (podStartTimeout) for the specified pod to become running.
@ -351,7 +418,13 @@ func WaitForPodRunningInNamespace(c clientset.Interface, pod *v1.Pod) error {
// WaitTimeoutForPodNoLongerRunningInNamespace waits the given timeout duration for the specified pod to stop.
func WaitTimeoutForPodNoLongerRunningInNamespace(c clientset.Interface, podName, namespace string, timeout time.Duration) error {
return wait.PollImmediate(poll, timeout, podCompleted(c, podName, namespace))
return WaitForPodCondition(c, namespace, podName, "completed", timeout, func(pod *v1.Pod) (bool, error) {
switch pod.Status.Phase {
case v1.PodFailed, v1.PodSucceeded:
return true, nil
}
return false, nil
})
}
// WaitForPodNoLongerRunningInNamespace waits default amount of time (defaultPodDeletionTimeout) for the specified pod to stop running.
@ -363,14 +436,32 @@ func WaitForPodNoLongerRunningInNamespace(c clientset.Interface, podName, namesp
// WaitTimeoutForPodReadyInNamespace waits the given timeout duration for the
// specified pod to be ready and running.
func WaitTimeoutForPodReadyInNamespace(c clientset.Interface, podName, namespace string, timeout time.Duration) error {
return wait.PollImmediate(poll, timeout, podRunningAndReady(c, podName, namespace))
return WaitForPodCondition(c, namespace, podName, "running and ready", timeout, func(pod *v1.Pod) (bool, error) {
switch pod.Status.Phase {
case v1.PodFailed, v1.PodSucceeded:
e2elog.Logf("The phase of Pod %s is %s which is unexpected, pod status: %#v", pod.Name, pod.Status.Phase, pod.Status)
return false, errPodCompleted
case v1.PodRunning:
e2elog.Logf("The phase of Pod %s is %s (Ready = %v)", pod.Name, pod.Status.Phase, podutils.IsPodReady(pod))
return podutils.IsPodReady(pod), nil
}
e2elog.Logf("The phase of Pod %s is %s, waiting for it to be Running (with Ready = true)", pod.Name, pod.Status.Phase)
return false, nil
})
}
// WaitForPodNotPending returns an error if it took too long for the pod to go out of pending state.
// The resourceVersion is used when Watching object changes, it tells since when we care
// about changes to the pod.
func WaitForPodNotPending(c clientset.Interface, ns, podName string) error {
return wait.PollImmediate(poll, podStartTimeout, podNotPending(c, podName, ns))
return WaitForPodCondition(c, ns, podName, "not pending", podStartTimeout, func(pod *v1.Pod) (bool, error) {
switch pod.Status.Phase {
case v1.PodPending:
return false, nil
default:
return true, nil
}
})
}
// WaitForPodSuccessInNamespace returns nil if the pod reached state success, or an error if it reached failure or until podStartupTimeout.
@ -388,32 +479,44 @@ func WaitForPodSuccessInNamespaceSlow(c clientset.Interface, podName string, nam
// api returns IsNotFound then the wait stops and nil is returned. If the Get api returns an error other
// than "not found" then that error is returned and the wait stops.
func WaitForPodNotFoundInNamespace(c clientset.Interface, podName, ns string, timeout time.Duration) error {
return wait.PollImmediate(poll, timeout, func() (bool, error) {
_, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
var lastPod *v1.Pod
err := wait.PollImmediate(poll, timeout, func() (bool, error) {
pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return true, nil // done
}
if err != nil {
return true, err // stop wait with error
return handleWaitingAPIError(err, true, "getting pod %s", podIdentifier(ns, podName))
}
lastPod = pod
return false, nil
})
if err == nil {
return nil
}
if IsTimeout(err) {
e2elog.Logf("Timed out while waiting for pod %s to be Not Found. Last observed as: %s",
podIdentifier(ns, podName), format.Object(lastPod, 1))
}
return maybeTimeoutError(err, "waiting for pod %s not found", podIdentifier(ns, podName))
}
// WaitForPodToDisappear waits the given timeout duration for the specified pod to disappear.
func WaitForPodToDisappear(c clientset.Interface, ns, podName string, label labels.Selector, interval, timeout time.Duration) error {
return wait.PollImmediate(interval, timeout, func() (bool, error) {
var lastPod *v1.Pod
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
e2elog.Logf("Waiting for pod %s to disappear", podName)
options := metav1.ListOptions{LabelSelector: label.String()}
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), options)
if err != nil {
return false, err
return handleWaitingAPIError(err, true, "listing pods")
}
found := false
for _, pod := range pods.Items {
for i, pod := range pods.Items {
if pod.Name == podName {
e2elog.Logf("Pod %s still exists", podName)
found = true
lastPod = &(pods.Items[i])
break
}
}
@ -423,91 +526,65 @@ func WaitForPodToDisappear(c clientset.Interface, ns, podName string, label labe
}
return false, nil
})
if err == nil {
return nil
}
if IsTimeout(err) {
e2elog.Logf("Timed out while waiting for pod %s to disappear. Last observed as: %s",
podIdentifier(ns, podName), format.Object(lastPod, 1))
}
return maybeTimeoutError(err, "waiting for pod %s to disappear", podIdentifier(ns, podName))
}
// PodsResponding waits for the pods to response.
func PodsResponding(c clientset.Interface, ns, name string, wantName bool, pods *v1.PodList) error {
ginkgo.By("trying to dial each unique pod")
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
return wait.PollImmediate(poll, podRespondingTimeout, NewProxyResponseChecker(c, ns, label, name, wantName, pods).CheckAllResponses)
err := wait.PollImmediate(poll, podRespondingTimeout, NewProxyResponseChecker(c, ns, label, name, wantName, pods).CheckAllResponses)
return maybeTimeoutError(err, "waiting for pods to be responsive")
}
// WaitForNumberOfPods waits up to timeout to ensure there are exact
// `num` pods in namespace `ns`.
// It returns the matching Pods or a timeout error.
func WaitForNumberOfPods(c clientset.Interface, ns string, num int, timeout time.Duration) (pods *v1.PodList, err error) {
actualNum := 0
err = wait.PollImmediate(poll, timeout, func() (bool, error) {
pods, err = c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
// ignore intermittent network error
if err != nil {
return false, nil
return handleWaitingAPIError(err, false, "listing pods")
}
return len(pods.Items) == num, nil
actualNum = len(pods.Items)
return actualNum == num, nil
})
return
return pods, maybeTimeoutError(err, "waiting for there to be exactly %d pods in namespace (last seen %d)", num, actualNum)
}
// WaitForPodsWithLabelScheduled waits for all matching pods to become scheduled and at least one
// matching pod exists. Return the list of matching pods.
func WaitForPodsWithLabelScheduled(c clientset.Interface, ns string, label labels.Selector) (pods *v1.PodList, err error) {
err = wait.PollImmediate(poll, podScheduledBeforeTimeout,
func() (bool, error) {
pods, err = WaitForPodsWithLabel(c, ns, label)
if err != nil {
return false, err
}
for _, pod := range pods.Items {
opts := metav1.ListOptions{LabelSelector: label.String()}
return WaitForAllPodsCondition(c, ns, opts, 1, "scheduled", podScheduledBeforeTimeout, func(pod *v1.Pod) (bool, error) {
if pod.Spec.NodeName == "" {
return false, nil
}
}
return true, nil
})
return pods, err
}
// WaitForPodsWithLabel waits up to podListTimeout for getting pods with certain label
func WaitForPodsWithLabel(c clientset.Interface, ns string, label labels.Selector) (pods *v1.PodList, err error) {
for t := time.Now(); time.Since(t) < podListTimeout; time.Sleep(poll) {
options := metav1.ListOptions{LabelSelector: label.String()}
pods, err = c.CoreV1().Pods(ns).List(context.TODO(), options)
if err != nil {
return
}
if len(pods.Items) > 0 {
break
}
}
if pods == nil || len(pods.Items) == 0 {
err = fmt.Errorf("Timeout while waiting for pods with label %v", label)
}
return
func WaitForPodsWithLabel(c clientset.Interface, ns string, label labels.Selector) (*v1.PodList, error) {
opts := metav1.ListOptions{LabelSelector: label.String()}
return WaitForAllPodsCondition(c, ns, opts, 1, "existent", podListTimeout, func(pod *v1.Pod) (bool, error) {
return true, nil
})
}
// WaitForPodsWithLabelRunningReady waits for exact amount of matching pods to become running and ready.
// Return the list of matching pods.
func WaitForPodsWithLabelRunningReady(c clientset.Interface, ns string, label labels.Selector, num int, timeout time.Duration) (pods *v1.PodList, err error) {
var current int
err = wait.Poll(poll, timeout,
func() (bool, error) {
pods, err = WaitForPodsWithLabel(c, ns, label)
if err != nil {
e2elog.Logf("Failed to list pods: %v", err)
return false, err
}
current = 0
for _, pod := range pods.Items {
if flag, err := testutils.PodRunningReady(&pod); err == nil && flag == true {
current++
}
}
if current != num {
e2elog.Logf("Got %v pods running and ready, expect: %v", current, num)
return false, nil
}
return true, nil
})
return pods, err
opts := metav1.ListOptions{LabelSelector: label.String()}
return WaitForAllPodsCondition(c, ns, opts, 1, "running and ready", podListTimeout, testutils.PodRunningReady)
}
// WaitForNRestartablePods tries to list restarting pods using ps until it finds expect of them,
@ -540,33 +617,101 @@ func WaitForNRestartablePods(ps *testutils.PodStore, expect int, timeout time.Du
// invalid container configuration. In this case, the container will remain in a waiting state with a specific
// reason set, which should match the given reason.
func WaitForPodContainerToFail(c clientset.Interface, namespace, podName string, containerIndex int, reason string, timeout time.Duration) error {
return wait.PollImmediate(poll, timeout, podContainerFailed(c, namespace, podName, containerIndex, reason))
conditionDesc := fmt.Sprintf("container %d failed with reason %s", containerIndex, reason)
return WaitForPodCondition(c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
switch pod.Status.Phase {
case v1.PodPending:
if len(pod.Status.ContainerStatuses) == 0 {
return false, nil
}
containerStatus := pod.Status.ContainerStatuses[containerIndex]
if containerStatus.State.Waiting != nil && containerStatus.State.Waiting.Reason == reason {
return true, nil
}
return false, nil
case v1.PodFailed, v1.PodRunning, v1.PodSucceeded:
return false, fmt.Errorf("pod was expected to be pending, but it is in the state: %s", pod.Status.Phase)
}
return false, nil
})
}
// WaitForPodContainerStarted waits for the given Pod container to start, after a successful run of the startupProbe.
func WaitForPodContainerStarted(c clientset.Interface, namespace, podName string, containerIndex int, timeout time.Duration) error {
return wait.PollImmediate(poll, timeout, podContainerStarted(c, namespace, podName, containerIndex))
conditionDesc := fmt.Sprintf("container %d started", containerIndex)
return WaitForPodCondition(c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
if containerIndex > len(pod.Status.ContainerStatuses)-1 {
return false, nil
}
containerStatus := pod.Status.ContainerStatuses[containerIndex]
return *containerStatus.Started, nil
})
}
// WaitForPodFailedReason wait for pod failed reason in status, for example "SysctlForbidden".
func WaitForPodFailedReason(c clientset.Interface, pod *v1.Pod, reason string, timeout time.Duration) error {
waitErr := wait.PollImmediate(poll, timeout, func() (bool, error) {
pod, err := c.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
conditionDesc := fmt.Sprintf("failed with reason %s", reason)
return WaitForPodCondition(c, pod.Namespace, pod.Name, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
switch pod.Status.Phase {
case v1.PodSucceeded:
return true, errors.New("pod succeeded unexpectedly")
case v1.PodFailed:
if pod.Status.Reason == reason {
return true, nil
} else {
return true, fmt.Errorf("pod failed with reason %s", pod.Status.Reason)
}
}
return false, nil
})
if waitErr != nil {
return fmt.Errorf("error waiting for pod failure status: %v", waitErr)
}
return nil
}
// WaitForContainerRunning waits for the given Pod container to have a state of running
func WaitForContainerRunning(c clientset.Interface, namespace, podName, containerName string, timeout time.Duration) error {
return wait.PollImmediate(poll, timeout, isContainerRunning(c, namespace, podName, containerName))
conditionDesc := fmt.Sprintf("container %s running", containerName)
return WaitForPodCondition(c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
for _, statuses := range [][]v1.ContainerStatus{pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses, pod.Status.EphemeralContainerStatuses} {
for _, cs := range statuses {
if cs.Name == containerName {
return cs.State.Running != nil, nil
}
}
}
return false, nil
})
}
// handleWaitingAPIErrror handles an error from an API request in the context of a Wait function.
// If the error is retryable, sleep the recommended delay and ignore the error.
// If the erorr is terminal, return it.
func handleWaitingAPIError(err error, retryNotFound bool, taskFormat string, taskArgs ...interface{}) (bool, error) {
taskDescription := fmt.Sprintf(taskFormat, taskArgs...)
if retryNotFound && apierrors.IsNotFound(err) {
e2elog.Logf("Ignoring NotFound error while " + taskDescription)
return false, nil
}
if retry, delay := shouldRetry(err); retry {
e2elog.Logf("Retryable error while %s, retrying after %v: %v", taskDescription, delay, err)
if delay > 0 {
time.Sleep(delay)
}
return false, nil
}
e2elog.Logf("Encountered non-retryable error while %s: %v", taskDescription, err)
return false, err
}
// Decide whether to retry an API request. Optionally include a delay to retry after.
func shouldRetry(err error) (retry bool, retryAfter time.Duration) {
// if the error sends the Retry-After header, we respect it as an explicit confirmation we should retry.
if delay, shouldRetry := apierrors.SuggestsClientDelay(err); shouldRetry {
return shouldRetry, time.Duration(delay) * time.Second
}
// these errors indicate a transient error that should be retried.
if apierrors.IsInternalError(err) || apierrors.IsTimeout(err) || apierrors.IsTooManyRequests(err) {
return true, 0
}
return false, 0
}