mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
[E2E] Refactor pod polling functions (WaitForX) (#109704)
* Clean up WaitFor Pod functions * Handle retryable errors when polling * Log more context on timeout * #squash Address PR feedback
This commit is contained in:
parent
a685faa798
commit
07c34eb400
@ -31,10 +31,8 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubectl/pkg/util/podutils"
|
||||
|
||||
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
|
||||
testutils "k8s.io/kubernetes/test/utils"
|
||||
@ -155,70 +153,6 @@ func (r ProxyResponseChecker) CheckAllResponses() (done bool, err error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func podRunning(c clientset.Interface, podName, namespace string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
switch pod.Status.Phase {
|
||||
case v1.PodRunning:
|
||||
return true, nil
|
||||
case v1.PodFailed, v1.PodSucceeded:
|
||||
return false, errPodCompleted
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func podCompleted(c clientset.Interface, podName, namespace string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
switch pod.Status.Phase {
|
||||
case v1.PodFailed, v1.PodSucceeded:
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func podRunningAndReady(c clientset.Interface, podName, namespace string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
switch pod.Status.Phase {
|
||||
case v1.PodFailed, v1.PodSucceeded:
|
||||
e2elog.Logf("The status of Pod %s is %s which is unexpected, pod status: %#v", podName, pod.Status.Phase, pod.Status)
|
||||
return false, errPodCompleted
|
||||
case v1.PodRunning:
|
||||
e2elog.Logf("The status of Pod %s is %s (Ready = %v)", podName, pod.Status.Phase, podutils.IsPodReady(pod))
|
||||
return podutils.IsPodReady(pod), nil
|
||||
}
|
||||
e2elog.Logf("The status of Pod %s is %s, waiting for it to be Running (with Ready = true)", podName, pod.Status.Phase)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func podNotPending(c clientset.Interface, podName, namespace string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
switch pod.Status.Phase {
|
||||
case v1.PodPending:
|
||||
return false, nil
|
||||
default:
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// PodsCreated returns a pod list matched by the given name.
|
||||
func PodsCreated(c clientset.Interface, ns, name string, replicas int32) (*v1.PodList, error) {
|
||||
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
|
||||
@ -305,60 +239,6 @@ func podsRunning(c clientset.Interface, pods *v1.PodList) []error {
|
||||
return e
|
||||
}
|
||||
|
||||
func podContainerFailed(c clientset.Interface, namespace, podName string, containerIndex int, reason string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
switch pod.Status.Phase {
|
||||
case v1.PodPending:
|
||||
if len(pod.Status.ContainerStatuses) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
containerStatus := pod.Status.ContainerStatuses[containerIndex]
|
||||
if containerStatus.State.Waiting != nil && containerStatus.State.Waiting.Reason == reason {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
case v1.PodFailed, v1.PodRunning, v1.PodSucceeded:
|
||||
return false, fmt.Errorf("pod was expected to be pending, but it is in the state: %s", pod.Status.Phase)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func podContainerStarted(c clientset.Interface, namespace, podName string, containerIndex int) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if containerIndex > len(pod.Status.ContainerStatuses)-1 {
|
||||
return false, nil
|
||||
}
|
||||
containerStatus := pod.Status.ContainerStatuses[containerIndex]
|
||||
return *containerStatus.Started, nil
|
||||
}
|
||||
}
|
||||
|
||||
func isContainerRunning(c clientset.Interface, namespace, podName, containerName string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, statuses := range [][]v1.ContainerStatus{pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses, pod.Status.EphemeralContainerStatuses} {
|
||||
for _, cs := range statuses {
|
||||
if cs.Name == containerName {
|
||||
return cs.State.Running != nil, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// LogPodStates logs basic info of provided pods for debugging.
|
||||
func LogPodStates(pods []v1.Pod) {
|
||||
// Find maximum widths for pod, node, and phase strings for column printing.
|
||||
@ -565,13 +445,7 @@ func CreateExecPodOrFail(client clientset.Interface, ns, generateName string, tw
|
||||
}
|
||||
execPod, err := client.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
|
||||
expectNoError(err, "failed to create new exec pod in namespace: %s", ns)
|
||||
err = wait.PollImmediate(poll, 5*time.Minute, func() (bool, error) {
|
||||
retrievedPod, err := client.CoreV1().Pods(execPod.Namespace).Get(context.TODO(), execPod.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return retrievedPod.Status.Phase == v1.PodRunning, nil
|
||||
})
|
||||
err = WaitForPodNameRunningInNamespace(client, execPod.Name, execPod.Namespace)
|
||||
expectNoError(err, "failed to create new exec pod in namespace: %s", ns)
|
||||
return execPod
|
||||
}
|
||||
@ -769,3 +643,15 @@ func IsPodActive(p *v1.Pod) bool {
|
||||
v1.PodFailed != p.Status.Phase &&
|
||||
p.DeletionTimestamp == nil
|
||||
}
|
||||
|
||||
func podIdentifier(namespace, name string) string {
|
||||
return fmt.Sprintf("%s/%s", namespace, name)
|
||||
}
|
||||
|
||||
func identifier(pod *v1.Pod) string {
|
||||
id := podIdentifier(pod.Namespace, pod.Name)
|
||||
if pod.UID != "" {
|
||||
id += fmt.Sprintf("(%s)", pod.UID)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo"
|
||||
"github.com/onsi/gomega/format"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
@ -65,17 +66,51 @@ const (
|
||||
|
||||
type podCondition func(pod *v1.Pod) (bool, error)
|
||||
|
||||
// errorBadPodsStates create error message of basic info of bad pods for debugging.
|
||||
func errorBadPodsStates(badPods []v1.Pod, desiredPods int, ns, desiredState string, timeout time.Duration, err error) string {
|
||||
errStr := fmt.Sprintf("%d / %d pods in namespace %q are NOT in %s state in %v\n", len(badPods), desiredPods, ns, desiredState, timeout)
|
||||
if err != nil {
|
||||
errStr += fmt.Sprintf("Last error: %s\n", err)
|
||||
}
|
||||
// Print bad pods info only if there are fewer than 10 bad pods
|
||||
if len(badPods) > 10 {
|
||||
return errStr + "There are too many bad pods. Please check log for details."
|
||||
type timeoutError struct {
|
||||
msg string
|
||||
}
|
||||
|
||||
func (e *timeoutError) Error() string {
|
||||
return e.msg
|
||||
}
|
||||
|
||||
func TimeoutError(format string, args ...interface{}) *timeoutError {
|
||||
return &timeoutError{
|
||||
msg: fmt.Sprintf(format, args...),
|
||||
}
|
||||
}
|
||||
|
||||
// maybeTimeoutError returns a TimeoutError if err is a timeout. Otherwise, wrap err.
|
||||
// taskFormat and taskArgs should be the task being performed when the error occurred,
|
||||
// e.g. "waiting for pod to be running".
|
||||
func maybeTimeoutError(err error, taskFormat string, taskArgs ...interface{}) error {
|
||||
if IsTimeout(err) {
|
||||
return TimeoutError("timed out while "+taskFormat, taskArgs...)
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("error while %s: %w", fmt.Sprintf(taskFormat, taskArgs...), err)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func IsTimeout(err error) bool {
|
||||
if err == wait.ErrWaitTimeout {
|
||||
return true
|
||||
}
|
||||
if _, ok := err.(*timeoutError); ok {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// errorBadPodsStates create error message of basic info of bad pods for debugging.
|
||||
func errorBadPodsStates(badPods []v1.Pod, desiredPods int, ns, desiredState string, timeout time.Duration, err error) error {
|
||||
errStr := fmt.Sprintf("%d / %d pods in namespace %s are NOT in %s state in %v\n", len(badPods), desiredPods, ns, desiredState, timeout)
|
||||
|
||||
// Print bad pods info only if there are fewer than 10 bad pods
|
||||
if len(badPods) > 10 {
|
||||
errStr += "There are too many bad pods. Please check log for details."
|
||||
} else {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
w := tabwriter.NewWriter(buf, 0, 0, 1, ' ', 0)
|
||||
fmt.Fprintln(w, "POD\tNODE\tPHASE\tGRACE\tCONDITIONS")
|
||||
@ -89,7 +124,13 @@ func errorBadPodsStates(badPods []v1.Pod, desiredPods int, ns, desiredState stri
|
||||
fmt.Fprintln(w, podInfo)
|
||||
}
|
||||
w.Flush()
|
||||
return errStr + buf.String()
|
||||
errStr += buf.String()
|
||||
}
|
||||
|
||||
if err != nil && !IsTimeout(err) {
|
||||
return fmt.Errorf("%s\nLast error: %w", errStr, err)
|
||||
}
|
||||
return TimeoutError(errStr)
|
||||
}
|
||||
|
||||
// WaitForPodsRunningReady waits up to timeout to ensure that all pods in
|
||||
@ -129,10 +170,9 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN
|
||||
lastAPIError = nil
|
||||
|
||||
rcList, err := c.CoreV1().ReplicationControllers(ns).List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
e2elog.Logf("Error getting replication controllers in namespace '%s': %v", ns, err)
|
||||
lastAPIError = err
|
||||
return false, err
|
||||
if err != nil {
|
||||
return handleWaitingAPIError(err, false, "listing replication controllers in namespace %s", ns)
|
||||
}
|
||||
for _, rc := range rcList.Items {
|
||||
replicas += *rc.Spec.Replicas
|
||||
@ -140,10 +180,9 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN
|
||||
}
|
||||
|
||||
rsList, err := c.AppsV1().ReplicaSets(ns).List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
lastAPIError = err
|
||||
e2elog.Logf("Error getting replication sets in namespace %q: %v", ns, err)
|
||||
return false, err
|
||||
if err != nil {
|
||||
return handleWaitingAPIError(err, false, "listing replication sets in namespace %s", ns)
|
||||
}
|
||||
for _, rs := range rsList.Items {
|
||||
replicas += *rs.Spec.Replicas
|
||||
@ -151,10 +190,9 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN
|
||||
}
|
||||
|
||||
podList, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
lastAPIError = err
|
||||
e2elog.Logf("Error getting pods in namespace '%s': %v", ns, err)
|
||||
return false, err
|
||||
if err != nil {
|
||||
return handleWaitingAPIError(err, false, "listing pods in namespace %s", ns)
|
||||
}
|
||||
nOk := int32(0)
|
||||
notReady = int32(0)
|
||||
@ -197,7 +235,7 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN
|
||||
return false, nil
|
||||
}) != nil {
|
||||
if !ignoreNotReady {
|
||||
return errors.New(errorBadPodsStates(badPods, desiredPods, ns, "RUNNING and READY", timeout, lastAPIError))
|
||||
return errorBadPodsStates(badPods, desiredPods, ns, "RUNNING and READY", timeout, lastAPIError)
|
||||
}
|
||||
e2elog.Logf("Number of not-ready pods (%d) is below the allowed threshold (%d).", notReady, allowedNotReadyPods)
|
||||
}
|
||||
@ -205,35 +243,83 @@ func WaitForPodsRunningReady(c clientset.Interface, ns string, minPods, allowedN
|
||||
}
|
||||
|
||||
// WaitForPodCondition waits a pods to be matched to the given condition.
|
||||
func WaitForPodCondition(c clientset.Interface, ns, podName, desc string, timeout time.Duration, condition podCondition) error {
|
||||
e2elog.Logf("Waiting up to %v for pod %q in namespace %q to be %q", timeout, podName, ns, desc)
|
||||
var lastPodError error
|
||||
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
|
||||
func WaitForPodCondition(c clientset.Interface, ns, podName, conditionDesc string, timeout time.Duration, condition podCondition) error {
|
||||
e2elog.Logf("Waiting up to %v for pod %q in namespace %q to be %q", timeout, podName, ns, conditionDesc)
|
||||
var (
|
||||
lastPodError error
|
||||
lastPod *v1.Pod
|
||||
start = time.Now()
|
||||
)
|
||||
err := wait.PollImmediate(poll, timeout, func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
lastPodError = err
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
e2elog.Logf("Pod %q in namespace %q not found. Error: %v", podName, ns, err)
|
||||
} else {
|
||||
e2elog.Logf("Get pod %q in namespace %q failed, ignoring for %v. Error: %v", podName, ns, poll, err)
|
||||
}
|
||||
continue
|
||||
return handleWaitingAPIError(err, true, "getting pod %s", podIdentifier(ns, podName))
|
||||
}
|
||||
lastPod = pod // Don't overwrite if an error occurs after successfully retrieving.
|
||||
|
||||
// log now so that current pod info is reported before calling `condition()`
|
||||
e2elog.Logf("Pod %q: Phase=%q, Reason=%q, readiness=%t. Elapsed: %v",
|
||||
podName, pod.Status.Phase, pod.Status.Reason, podutils.IsPodReady(pod), time.Since(start))
|
||||
if done, err := condition(pod); done {
|
||||
if err == nil {
|
||||
e2elog.Logf("Pod %q satisfied condition %q", podName, desc)
|
||||
e2elog.Logf("Pod %q satisfied condition %q", podName, conditionDesc)
|
||||
}
|
||||
return err
|
||||
return true, err
|
||||
} else if err != nil {
|
||||
// TODO(#109732): stop polling and return the error in this case.
|
||||
e2elog.Logf("Error evaluating pod condition %s: %v", conditionDesc, err)
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if IsTimeout(err) {
|
||||
e2elog.Logf("Timed out while waiting for pod %s to be %s. Last observed as: %s",
|
||||
podIdentifier(ns, podName), conditionDesc, format.Object(lastPod, 1))
|
||||
}
|
||||
if lastPodError != nil {
|
||||
// If the last API call was an error.
|
||||
err = lastPodError
|
||||
}
|
||||
return maybeTimeoutError(err, "waiting for pod %s to be %s", podIdentifier(ns, podName), conditionDesc)
|
||||
}
|
||||
|
||||
// WaitForPodsCondition waits for the listed pods to match the given condition.
|
||||
// To succeed, at least minPods must be listed, and all listed pods must match the condition.
|
||||
func WaitForAllPodsCondition(c clientset.Interface, ns string, opts metav1.ListOptions, minPods int, conditionDesc string, timeout time.Duration, condition podCondition) (*v1.PodList, error) {
|
||||
e2elog.Logf("Waiting up to %v for at least %d pods in namespace %s to be %s", timeout, minPods, ns, conditionDesc)
|
||||
var pods *v1.PodList
|
||||
matched := 0
|
||||
err := wait.PollImmediate(poll, timeout, func() (done bool, err error) {
|
||||
pods, err = c.CoreV1().Pods(ns).List(context.TODO(), opts)
|
||||
if err != nil {
|
||||
return handleWaitingAPIError(err, true, "listing pods")
|
||||
}
|
||||
if len(pods.Items) < minPods {
|
||||
e2elog.Logf("found %d pods, waiting for at least %d", len(pods.Items), minPods)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
nonMatchingPods := []string{}
|
||||
for _, pod := range pods.Items {
|
||||
done, err := condition(&pod)
|
||||
if done && err != nil {
|
||||
return false, fmt.Errorf("error evaluating pod %s: %w", identifier(&pod), err)
|
||||
}
|
||||
if !done {
|
||||
nonMatchingPods = append(nonMatchingPods, identifier(&pod))
|
||||
}
|
||||
}
|
||||
if apierrors.IsNotFound(lastPodError) {
|
||||
// return for compatbility with other functions testing for IsNotFound
|
||||
return lastPodError
|
||||
matched = len(pods.Items) - len(nonMatchingPods)
|
||||
if len(nonMatchingPods) <= 0 {
|
||||
return true, nil // All pods match.
|
||||
}
|
||||
return fmt.Errorf("Gave up after waiting %v for pod %q to be %q", timeout, podName, desc)
|
||||
e2elog.Logf("%d pods are not %s: %v", len(nonMatchingPods), conditionDesc, nonMatchingPods)
|
||||
return false, nil
|
||||
})
|
||||
return pods, maybeTimeoutError(err, "waiting for at least %d pods to be %s (matched %d)", minPods, conditionDesc, matched)
|
||||
}
|
||||
|
||||
// WaitForPodTerminatedInNamespace returns an error if it takes too long for the pod to terminate,
|
||||
@ -242,7 +328,7 @@ func WaitForPodCondition(c clientset.Interface, ns, podName, desc string, timeou
|
||||
// terminated (reason==""), but may be called to detect if a pod did *not* terminate according to
|
||||
// the supplied reason.
|
||||
func WaitForPodTerminatedInNamespace(c clientset.Interface, podName, reason, namespace string) error {
|
||||
return WaitForPodCondition(c, namespace, podName, "terminated due to deadline exceeded", podStartTimeout, func(pod *v1.Pod) (bool, error) {
|
||||
return WaitForPodCondition(c, namespace, podName, fmt.Sprintf("terminated with reason %s", reason), podStartTimeout, func(pod *v1.Pod) (bool, error) {
|
||||
// Only consider Failed pods. Successful pods will be deleted and detected in
|
||||
// waitForPodCondition's Get call returning `IsNotFound`
|
||||
if pod.Status.Phase == v1.PodFailed {
|
||||
@ -295,33 +381,6 @@ func WaitForPodNameUnschedulableInNamespace(c clientset.Interface, podName, name
|
||||
})
|
||||
}
|
||||
|
||||
// WaitForMatchPodsCondition finds match pods based on the input ListOptions.
|
||||
// waits and checks if all match pods are in the given podCondition
|
||||
func WaitForMatchPodsCondition(c clientset.Interface, opts metav1.ListOptions, desc string, timeout time.Duration, condition podCondition) error {
|
||||
e2elog.Logf("Waiting up to %v for matching pods' status to be %s", timeout, desc)
|
||||
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
|
||||
pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conditionNotMatch := []string{}
|
||||
for _, pod := range pods.Items {
|
||||
done, err := condition(&pod)
|
||||
if done && err != nil {
|
||||
return fmt.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if !done {
|
||||
conditionNotMatch = append(conditionNotMatch, fmt.Sprintf("%s_%s(%s)", pod.Name, pod.Namespace, pod.UID))
|
||||
}
|
||||
}
|
||||
if len(conditionNotMatch) <= 0 {
|
||||
return err
|
||||
}
|
||||
e2elog.Logf("%d pods are not %s: %v", len(conditionNotMatch), desc, conditionNotMatch)
|
||||
}
|
||||
return fmt.Errorf("gave up waiting for matching pods to be '%s' after %v", desc, timeout)
|
||||
}
|
||||
|
||||
// WaitForPodNameRunningInNamespace waits default amount of time (PodStartTimeout) for the specified pod to become running.
|
||||
// Returns an error if timeout occurs first, or pod goes in to failed state.
|
||||
func WaitForPodNameRunningInNamespace(c clientset.Interface, podName, namespace string) error {
|
||||
@ -337,7 +396,15 @@ func WaitForPodRunningInNamespaceSlow(c clientset.Interface, podName, namespace
|
||||
|
||||
// WaitTimeoutForPodRunningInNamespace waits the given timeout duration for the specified pod to become running.
|
||||
func WaitTimeoutForPodRunningInNamespace(c clientset.Interface, podName, namespace string, timeout time.Duration) error {
|
||||
return wait.PollImmediate(poll, timeout, podRunning(c, podName, namespace))
|
||||
return WaitForPodCondition(c, namespace, podName, "running", timeout, func(pod *v1.Pod) (bool, error) {
|
||||
switch pod.Status.Phase {
|
||||
case v1.PodRunning:
|
||||
return true, nil
|
||||
case v1.PodFailed, v1.PodSucceeded:
|
||||
return false, errPodCompleted
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
// WaitForPodRunningInNamespace waits default amount of time (podStartTimeout) for the specified pod to become running.
|
||||
@ -351,7 +418,13 @@ func WaitForPodRunningInNamespace(c clientset.Interface, pod *v1.Pod) error {
|
||||
|
||||
// WaitTimeoutForPodNoLongerRunningInNamespace waits the given timeout duration for the specified pod to stop.
|
||||
func WaitTimeoutForPodNoLongerRunningInNamespace(c clientset.Interface, podName, namespace string, timeout time.Duration) error {
|
||||
return wait.PollImmediate(poll, timeout, podCompleted(c, podName, namespace))
|
||||
return WaitForPodCondition(c, namespace, podName, "completed", timeout, func(pod *v1.Pod) (bool, error) {
|
||||
switch pod.Status.Phase {
|
||||
case v1.PodFailed, v1.PodSucceeded:
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
// WaitForPodNoLongerRunningInNamespace waits default amount of time (defaultPodDeletionTimeout) for the specified pod to stop running.
|
||||
@ -363,14 +436,32 @@ func WaitForPodNoLongerRunningInNamespace(c clientset.Interface, podName, namesp
|
||||
// WaitTimeoutForPodReadyInNamespace waits the given timeout duration for the
|
||||
// specified pod to be ready and running.
|
||||
func WaitTimeoutForPodReadyInNamespace(c clientset.Interface, podName, namespace string, timeout time.Duration) error {
|
||||
return wait.PollImmediate(poll, timeout, podRunningAndReady(c, podName, namespace))
|
||||
return WaitForPodCondition(c, namespace, podName, "running and ready", timeout, func(pod *v1.Pod) (bool, error) {
|
||||
switch pod.Status.Phase {
|
||||
case v1.PodFailed, v1.PodSucceeded:
|
||||
e2elog.Logf("The phase of Pod %s is %s which is unexpected, pod status: %#v", pod.Name, pod.Status.Phase, pod.Status)
|
||||
return false, errPodCompleted
|
||||
case v1.PodRunning:
|
||||
e2elog.Logf("The phase of Pod %s is %s (Ready = %v)", pod.Name, pod.Status.Phase, podutils.IsPodReady(pod))
|
||||
return podutils.IsPodReady(pod), nil
|
||||
}
|
||||
e2elog.Logf("The phase of Pod %s is %s, waiting for it to be Running (with Ready = true)", pod.Name, pod.Status.Phase)
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
// WaitForPodNotPending returns an error if it took too long for the pod to go out of pending state.
|
||||
// The resourceVersion is used when Watching object changes, it tells since when we care
|
||||
// about changes to the pod.
|
||||
func WaitForPodNotPending(c clientset.Interface, ns, podName string) error {
|
||||
return wait.PollImmediate(poll, podStartTimeout, podNotPending(c, podName, ns))
|
||||
return WaitForPodCondition(c, ns, podName, "not pending", podStartTimeout, func(pod *v1.Pod) (bool, error) {
|
||||
switch pod.Status.Phase {
|
||||
case v1.PodPending:
|
||||
return false, nil
|
||||
default:
|
||||
return true, nil
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// WaitForPodSuccessInNamespace returns nil if the pod reached state success, or an error if it reached failure or until podStartupTimeout.
|
||||
@ -388,32 +479,44 @@ func WaitForPodSuccessInNamespaceSlow(c clientset.Interface, podName string, nam
|
||||
// api returns IsNotFound then the wait stops and nil is returned. If the Get api returns an error other
|
||||
// than "not found" then that error is returned and the wait stops.
|
||||
func WaitForPodNotFoundInNamespace(c clientset.Interface, podName, ns string, timeout time.Duration) error {
|
||||
return wait.PollImmediate(poll, timeout, func() (bool, error) {
|
||||
_, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
var lastPod *v1.Pod
|
||||
err := wait.PollImmediate(poll, timeout, func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
return true, nil // done
|
||||
}
|
||||
if err != nil {
|
||||
return true, err // stop wait with error
|
||||
return handleWaitingAPIError(err, true, "getting pod %s", podIdentifier(ns, podName))
|
||||
}
|
||||
lastPod = pod
|
||||
return false, nil
|
||||
})
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if IsTimeout(err) {
|
||||
e2elog.Logf("Timed out while waiting for pod %s to be Not Found. Last observed as: %s",
|
||||
podIdentifier(ns, podName), format.Object(lastPod, 1))
|
||||
}
|
||||
return maybeTimeoutError(err, "waiting for pod %s not found", podIdentifier(ns, podName))
|
||||
}
|
||||
|
||||
// WaitForPodToDisappear waits the given timeout duration for the specified pod to disappear.
|
||||
func WaitForPodToDisappear(c clientset.Interface, ns, podName string, label labels.Selector, interval, timeout time.Duration) error {
|
||||
return wait.PollImmediate(interval, timeout, func() (bool, error) {
|
||||
var lastPod *v1.Pod
|
||||
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
|
||||
e2elog.Logf("Waiting for pod %s to disappear", podName)
|
||||
options := metav1.ListOptions{LabelSelector: label.String()}
|
||||
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), options)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return handleWaitingAPIError(err, true, "listing pods")
|
||||
}
|
||||
found := false
|
||||
for _, pod := range pods.Items {
|
||||
for i, pod := range pods.Items {
|
||||
if pod.Name == podName {
|
||||
e2elog.Logf("Pod %s still exists", podName)
|
||||
found = true
|
||||
lastPod = &(pods.Items[i])
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -423,91 +526,65 @@ func WaitForPodToDisappear(c clientset.Interface, ns, podName string, label labe
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if IsTimeout(err) {
|
||||
e2elog.Logf("Timed out while waiting for pod %s to disappear. Last observed as: %s",
|
||||
podIdentifier(ns, podName), format.Object(lastPod, 1))
|
||||
}
|
||||
return maybeTimeoutError(err, "waiting for pod %s to disappear", podIdentifier(ns, podName))
|
||||
}
|
||||
|
||||
// PodsResponding waits for the pods to response.
|
||||
func PodsResponding(c clientset.Interface, ns, name string, wantName bool, pods *v1.PodList) error {
|
||||
ginkgo.By("trying to dial each unique pod")
|
||||
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
|
||||
return wait.PollImmediate(poll, podRespondingTimeout, NewProxyResponseChecker(c, ns, label, name, wantName, pods).CheckAllResponses)
|
||||
err := wait.PollImmediate(poll, podRespondingTimeout, NewProxyResponseChecker(c, ns, label, name, wantName, pods).CheckAllResponses)
|
||||
return maybeTimeoutError(err, "waiting for pods to be responsive")
|
||||
}
|
||||
|
||||
// WaitForNumberOfPods waits up to timeout to ensure there are exact
|
||||
// `num` pods in namespace `ns`.
|
||||
// It returns the matching Pods or a timeout error.
|
||||
func WaitForNumberOfPods(c clientset.Interface, ns string, num int, timeout time.Duration) (pods *v1.PodList, err error) {
|
||||
actualNum := 0
|
||||
err = wait.PollImmediate(poll, timeout, func() (bool, error) {
|
||||
pods, err = c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
|
||||
// ignore intermittent network error
|
||||
if err != nil {
|
||||
return false, nil
|
||||
return handleWaitingAPIError(err, false, "listing pods")
|
||||
}
|
||||
return len(pods.Items) == num, nil
|
||||
actualNum = len(pods.Items)
|
||||
return actualNum == num, nil
|
||||
})
|
||||
return
|
||||
return pods, maybeTimeoutError(err, "waiting for there to be exactly %d pods in namespace (last seen %d)", num, actualNum)
|
||||
}
|
||||
|
||||
// WaitForPodsWithLabelScheduled waits for all matching pods to become scheduled and at least one
|
||||
// matching pod exists. Return the list of matching pods.
|
||||
func WaitForPodsWithLabelScheduled(c clientset.Interface, ns string, label labels.Selector) (pods *v1.PodList, err error) {
|
||||
err = wait.PollImmediate(poll, podScheduledBeforeTimeout,
|
||||
func() (bool, error) {
|
||||
pods, err = WaitForPodsWithLabel(c, ns, label)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, pod := range pods.Items {
|
||||
opts := metav1.ListOptions{LabelSelector: label.String()}
|
||||
return WaitForAllPodsCondition(c, ns, opts, 1, "scheduled", podScheduledBeforeTimeout, func(pod *v1.Pod) (bool, error) {
|
||||
if pod.Spec.NodeName == "" {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
return pods, err
|
||||
}
|
||||
|
||||
// WaitForPodsWithLabel waits up to podListTimeout for getting pods with certain label
|
||||
func WaitForPodsWithLabel(c clientset.Interface, ns string, label labels.Selector) (pods *v1.PodList, err error) {
|
||||
for t := time.Now(); time.Since(t) < podListTimeout; time.Sleep(poll) {
|
||||
options := metav1.ListOptions{LabelSelector: label.String()}
|
||||
pods, err = c.CoreV1().Pods(ns).List(context.TODO(), options)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if len(pods.Items) > 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if pods == nil || len(pods.Items) == 0 {
|
||||
err = fmt.Errorf("Timeout while waiting for pods with label %v", label)
|
||||
}
|
||||
return
|
||||
func WaitForPodsWithLabel(c clientset.Interface, ns string, label labels.Selector) (*v1.PodList, error) {
|
||||
opts := metav1.ListOptions{LabelSelector: label.String()}
|
||||
return WaitForAllPodsCondition(c, ns, opts, 1, "existent", podListTimeout, func(pod *v1.Pod) (bool, error) {
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
// WaitForPodsWithLabelRunningReady waits for exact amount of matching pods to become running and ready.
|
||||
// Return the list of matching pods.
|
||||
func WaitForPodsWithLabelRunningReady(c clientset.Interface, ns string, label labels.Selector, num int, timeout time.Duration) (pods *v1.PodList, err error) {
|
||||
var current int
|
||||
err = wait.Poll(poll, timeout,
|
||||
func() (bool, error) {
|
||||
pods, err = WaitForPodsWithLabel(c, ns, label)
|
||||
if err != nil {
|
||||
e2elog.Logf("Failed to list pods: %v", err)
|
||||
return false, err
|
||||
}
|
||||
current = 0
|
||||
for _, pod := range pods.Items {
|
||||
if flag, err := testutils.PodRunningReady(&pod); err == nil && flag == true {
|
||||
current++
|
||||
}
|
||||
}
|
||||
if current != num {
|
||||
e2elog.Logf("Got %v pods running and ready, expect: %v", current, num)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
return pods, err
|
||||
opts := metav1.ListOptions{LabelSelector: label.String()}
|
||||
return WaitForAllPodsCondition(c, ns, opts, 1, "running and ready", podListTimeout, testutils.PodRunningReady)
|
||||
}
|
||||
|
||||
// WaitForNRestartablePods tries to list restarting pods using ps until it finds expect of them,
|
||||
@ -540,33 +617,101 @@ func WaitForNRestartablePods(ps *testutils.PodStore, expect int, timeout time.Du
|
||||
// invalid container configuration. In this case, the container will remain in a waiting state with a specific
|
||||
// reason set, which should match the given reason.
|
||||
func WaitForPodContainerToFail(c clientset.Interface, namespace, podName string, containerIndex int, reason string, timeout time.Duration) error {
|
||||
return wait.PollImmediate(poll, timeout, podContainerFailed(c, namespace, podName, containerIndex, reason))
|
||||
conditionDesc := fmt.Sprintf("container %d failed with reason %s", containerIndex, reason)
|
||||
return WaitForPodCondition(c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
|
||||
switch pod.Status.Phase {
|
||||
case v1.PodPending:
|
||||
if len(pod.Status.ContainerStatuses) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
containerStatus := pod.Status.ContainerStatuses[containerIndex]
|
||||
if containerStatus.State.Waiting != nil && containerStatus.State.Waiting.Reason == reason {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
case v1.PodFailed, v1.PodRunning, v1.PodSucceeded:
|
||||
return false, fmt.Errorf("pod was expected to be pending, but it is in the state: %s", pod.Status.Phase)
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
// WaitForPodContainerStarted waits for the given Pod container to start, after a successful run of the startupProbe.
|
||||
func WaitForPodContainerStarted(c clientset.Interface, namespace, podName string, containerIndex int, timeout time.Duration) error {
|
||||
return wait.PollImmediate(poll, timeout, podContainerStarted(c, namespace, podName, containerIndex))
|
||||
conditionDesc := fmt.Sprintf("container %d started", containerIndex)
|
||||
return WaitForPodCondition(c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
|
||||
if containerIndex > len(pod.Status.ContainerStatuses)-1 {
|
||||
return false, nil
|
||||
}
|
||||
containerStatus := pod.Status.ContainerStatuses[containerIndex]
|
||||
return *containerStatus.Started, nil
|
||||
})
|
||||
}
|
||||
|
||||
// WaitForPodFailedReason wait for pod failed reason in status, for example "SysctlForbidden".
|
||||
func WaitForPodFailedReason(c clientset.Interface, pod *v1.Pod, reason string, timeout time.Duration) error {
|
||||
waitErr := wait.PollImmediate(poll, timeout, func() (bool, error) {
|
||||
pod, err := c.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
conditionDesc := fmt.Sprintf("failed with reason %s", reason)
|
||||
return WaitForPodCondition(c, pod.Namespace, pod.Name, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
|
||||
switch pod.Status.Phase {
|
||||
case v1.PodSucceeded:
|
||||
return true, errors.New("pod succeeded unexpectedly")
|
||||
case v1.PodFailed:
|
||||
if pod.Status.Reason == reason {
|
||||
return true, nil
|
||||
} else {
|
||||
return true, fmt.Errorf("pod failed with reason %s", pod.Status.Reason)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if waitErr != nil {
|
||||
return fmt.Errorf("error waiting for pod failure status: %v", waitErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForContainerRunning waits for the given Pod container to have a state of running
|
||||
func WaitForContainerRunning(c clientset.Interface, namespace, podName, containerName string, timeout time.Duration) error {
|
||||
return wait.PollImmediate(poll, timeout, isContainerRunning(c, namespace, podName, containerName))
|
||||
conditionDesc := fmt.Sprintf("container %s running", containerName)
|
||||
return WaitForPodCondition(c, namespace, podName, conditionDesc, timeout, func(pod *v1.Pod) (bool, error) {
|
||||
for _, statuses := range [][]v1.ContainerStatus{pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses, pod.Status.EphemeralContainerStatuses} {
|
||||
for _, cs := range statuses {
|
||||
if cs.Name == containerName {
|
||||
return cs.State.Running != nil, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
// handleWaitingAPIErrror handles an error from an API request in the context of a Wait function.
|
||||
// If the error is retryable, sleep the recommended delay and ignore the error.
|
||||
// If the erorr is terminal, return it.
|
||||
func handleWaitingAPIError(err error, retryNotFound bool, taskFormat string, taskArgs ...interface{}) (bool, error) {
|
||||
taskDescription := fmt.Sprintf(taskFormat, taskArgs...)
|
||||
if retryNotFound && apierrors.IsNotFound(err) {
|
||||
e2elog.Logf("Ignoring NotFound error while " + taskDescription)
|
||||
return false, nil
|
||||
}
|
||||
if retry, delay := shouldRetry(err); retry {
|
||||
e2elog.Logf("Retryable error while %s, retrying after %v: %v", taskDescription, delay, err)
|
||||
if delay > 0 {
|
||||
time.Sleep(delay)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
e2elog.Logf("Encountered non-retryable error while %s: %v", taskDescription, err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Decide whether to retry an API request. Optionally include a delay to retry after.
|
||||
func shouldRetry(err error) (retry bool, retryAfter time.Duration) {
|
||||
// if the error sends the Retry-After header, we respect it as an explicit confirmation we should retry.
|
||||
if delay, shouldRetry := apierrors.SuggestsClientDelay(err); shouldRetry {
|
||||
return shouldRetry, time.Duration(delay) * time.Second
|
||||
}
|
||||
|
||||
// these errors indicate a transient error that should be retried.
|
||||
if apierrors.IsInternalError(err) || apierrors.IsTimeout(err) || apierrors.IsTooManyRequests(err) {
|
||||
return true, 0
|
||||
}
|
||||
|
||||
return false, 0
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user