e2e: consolidate checking a pod list

WaitForPods is now a generic function which lists pods and then checks the pods
that it found against some provided condition. A parameter determines how many
pods must be found resp. match the condition for the check to succeed.
This commit is contained in:
Patrick Ohly 2023-01-20 12:04:24 +01:00
parent d8428c6fb1
commit 45d4631069
2 changed files with 113 additions and 101 deletions

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/util/podutils"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/test/e2e/framework"
testutils "k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils/format"
@ -277,112 +278,134 @@ func WaitForPodCondition(ctx context.Context, c clientset.Interface, ns, podName
}))
}
// WaitForAllPodsCondition waits for the listed pods to match the given condition.
// To succeed, at least minPods must be listed, and all listed pods must match the condition.
func WaitForAllPodsCondition(ctx context.Context, c clientset.Interface, ns string, opts metav1.ListOptions, minPods int, conditionDesc string, timeout time.Duration, condition podCondition) (*v1.PodList, error) {
framework.Logf("Waiting up to %v for at least %d pods in namespace %s to be %s", timeout, minPods, ns, conditionDesc)
var pods *v1.PodList
matched := 0
err := wait.PollImmediateWithContext(ctx, framework.PollInterval(), timeout, func(ctx context.Context) (done bool, err error) {
pods, err = c.CoreV1().Pods(ns).List(ctx, opts)
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
// Range determines how many items must exist and how many must match a certain
// condition. Values <= 0 are ignored.
// TODO (?): move to test/e2e/framework/range
type Range struct {
// MinMatching must be <= actual matching items or <= 0.
MinMatching int
// MaxMatching must be >= actual matching items or <= 0.
// To check for "no matching items", set NonMatching.
MaxMatching int
// NoneMatching indicates that no item must match.
NoneMatching bool
// AllMatching indicates that all items must match.
AllMatching bool
// MinFound must be <= existing items or <= 0.
MinFound int
}
// Min returns how many items must exist.
func (r Range) Min() int {
min := r.MinMatching
if min < r.MinFound {
min = r.MinFound
}
return min
}
// WaitForPods waits for pods in the given namespace to match the given
// condition. How many pods must exist and how many must match the condition
// is determined by the range parameter. The condition callback may use
// gomega.StopTrying(...).Now() to abort early. The condition description
// will be used with "expected pods to <description>".
func WaitForPods(ctx context.Context, c clientset.Interface, ns string, opts metav1.ListOptions, r Range, timeout time.Duration, conditionDesc string, condition func(*v1.Pod) bool) (*v1.PodList, error) {
var finalPods *v1.PodList
minPods := r.Min()
match := func(pods *v1.PodList) (func() string, error) {
finalPods = pods
if len(pods.Items) < minPods {
framework.Logf("found %d pods, waiting for at least %d", len(pods.Items), minPods)
return false, nil
return func() string {
return fmt.Sprintf("expected at least %d pods, only got %d", minPods, len(pods.Items))
}, nil
}
nonMatchingPods := []string{}
var nonMatchingPods, matchingPods []v1.Pod
for _, pod := range pods.Items {
done, err := condition(&pod)
if done && err != nil {
return false, fmt.Errorf("error evaluating pod %s: %w", identifier(&pod), err)
}
if !done {
nonMatchingPods = append(nonMatchingPods, identifier(&pod))
if condition(&pod) {
matchingPods = append(matchingPods, pod)
} else {
nonMatchingPods = append(nonMatchingPods, pod)
}
}
matched = len(pods.Items) - len(nonMatchingPods)
if len(nonMatchingPods) <= 0 {
return true, nil // All pods match.
matching := len(pods.Items) - len(nonMatchingPods)
if matching < r.MinMatching && r.MinMatching > 0 {
return func() string {
return fmt.Sprintf("expected at least %d pods to %s, %d out of %d were not:\n%s",
r.MinMatching, conditionDesc, len(nonMatchingPods), len(pods.Items),
format.Object(nonMatchingPods, 1))
}, nil
}
framework.Logf("%d pods are not %s: %v", len(nonMatchingPods), conditionDesc, nonMatchingPods)
return false, nil
})
return pods, maybeTimeoutError(err, "waiting for at least %d pods to be %s (matched %d)", minPods, conditionDesc, matched)
if len(nonMatchingPods) > 0 && r.AllMatching {
return func() string {
return fmt.Sprintf("expected all pods to %s, %d out of %d were not:\n%s",
conditionDesc, len(nonMatchingPods), len(pods.Items),
format.Object(nonMatchingPods, 1))
}, nil
}
if matching > r.MaxMatching && r.MaxMatching > 0 {
return func() string {
return fmt.Sprintf("expected at most %d pods to %s, %d out of %d were:\n%s",
r.MinMatching, conditionDesc, len(matchingPods), len(pods.Items),
format.Object(matchingPods, 1))
}, nil
}
if matching > 0 && r.NoneMatching {
return func() string {
return fmt.Sprintf("expected no pods to %s, %d out of %d were:\n%s",
conditionDesc, len(matchingPods), len(pods.Items),
format.Object(matchingPods, 1))
}, nil
}
return nil, nil
}
err := framework.Gomega().
Eventually(ctx, framework.ListObjects(c.CoreV1().Pods(ns).List, opts)).
WithTimeout(timeout).
Should(framework.MakeMatcher(match))
return finalPods, err
}
// RunningReady checks whether pod p's phase is running and it has a ready
// condition of status true.
func RunningReady(p *v1.Pod) bool {
return p.Status.Phase == v1.PodRunning && podutil.IsPodReady(p)
}
// WaitForPodsRunning waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` are running.
func WaitForPodsRunning(c clientset.Interface, ns string, num int, timeout time.Duration) error {
matched := 0
err := wait.PollImmediate(framework.PollInterval(), timeout, func() (done bool, err error) {
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
}
matched = 0
for _, pod := range pods.Items {
if ready, _ := testutils.PodRunningReady(&pod); ready {
matched++
}
}
if matched == num {
return true, nil
}
framework.Logf("expect %d pods are running, but got %v", num, matched)
return false, nil
_, err := WaitForPods(context.TODO(), c, ns, metav1.ListOptions{}, Range{MinMatching: num, MaxMatching: num}, timeout,
"be running and ready", func(pod *v1.Pod) bool {
ready, _ := testutils.PodRunningReady(pod)
return ready
})
return maybeTimeoutError(err, "waiting for pods to be running (want %v, matched %d)", num, matched)
return err
}
// WaitForPodsSchedulingGated waits for a given `timeout` to evaluate if a certain amount of pods in given `ns` stay in scheduling gated state.
func WaitForPodsSchedulingGated(c clientset.Interface, ns string, num int, timeout time.Duration) error {
matched := 0
err := wait.PollImmediate(framework.PollInterval(), timeout, func() (done bool, err error) {
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
}
matched = 0
for _, pod := range pods.Items {
_, err := WaitForPods(context.TODO(), c, ns, metav1.ListOptions{}, Range{MinMatching: num, MaxMatching: num}, timeout,
"be in scheduling gated state", func(pod *v1.Pod) bool {
for _, condition := range pod.Status.Conditions {
if condition.Type == v1.PodScheduled && condition.Reason == v1.PodReasonSchedulingGated {
matched++
return true
}
}
}
if matched == num {
return true, nil
}
framework.Logf("expect %d pods in scheduling gated state, but got %v", num, matched)
return false, nil
return false
})
return maybeTimeoutError(err, "waiting for pods to be scheduling gated (want %d, matched %d)", num, matched)
return err
}
// WaitForPodsWithSchedulingGates waits for a given `timeout` to evaluate if a certain amount of pods in given `ns`
// match the given `schedulingGates`stay in scheduling gated state.
func WaitForPodsWithSchedulingGates(c clientset.Interface, ns string, num int, timeout time.Duration, schedulingGates []v1.PodSchedulingGate) error {
matched := 0
err := wait.PollImmediate(framework.PollInterval(), timeout, func() (done bool, err error) {
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, true, "listing pods")
}
matched = 0
for _, pod := range pods.Items {
if reflect.DeepEqual(pod.Spec.SchedulingGates, schedulingGates) {
matched++
}
}
if matched == num {
return true, nil
}
framework.Logf("expect %d pods carry the expected scheduling gates, but got %v", num, matched)
return false, nil
_, err := WaitForPods(context.TODO(), c, ns, metav1.ListOptions{}, Range{MinMatching: num, MaxMatching: num}, timeout,
"have certain scheduling gates", func(pod *v1.Pod) bool {
return reflect.DeepEqual(pod.Spec.SchedulingGates, schedulingGates)
})
return maybeTimeoutError(err, "waiting for pods to carry the expected scheduling gates (want %d, matched %d)", num, matched)
return err
}
// WaitForPodTerminatedInNamespace returns an error if it takes too long for the pod to terminate,
@ -616,35 +639,25 @@ func PodsResponding(ctx context.Context, c clientset.Interface, ns, name string,
// `num` pods in namespace `ns`.
// It returns the matching Pods or a timeout error.
func WaitForNumberOfPods(ctx context.Context, c clientset.Interface, ns string, num int, timeout time.Duration) (pods *v1.PodList, err error) {
actualNum := 0
err = wait.PollImmediateWithContext(ctx, framework.PollInterval(), timeout, func(ctx context.Context) (bool, error) {
pods, err = c.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
if err != nil {
return handleWaitingAPIError(err, false, "listing pods")
}
actualNum = len(pods.Items)
return actualNum == num, nil
return WaitForPods(ctx, c, ns, metav1.ListOptions{}, Range{MinMatching: num, MaxMatching: num}, podScheduledBeforeTimeout, "exist", func(pod *v1.Pod) bool {
return true
})
return pods, maybeTimeoutError(err, "waiting for there to be exactly %d pods in namespace (last seen %d)", num, actualNum)
}
// WaitForPodsWithLabelScheduled waits for all matching pods to become scheduled and at least one
// matching pod exists. Return the list of matching pods.
func WaitForPodsWithLabelScheduled(ctx context.Context, c clientset.Interface, ns string, label labels.Selector) (pods *v1.PodList, err error) {
opts := metav1.ListOptions{LabelSelector: label.String()}
return WaitForAllPodsCondition(ctx, c, ns, opts, 1, "scheduled", podScheduledBeforeTimeout, func(pod *v1.Pod) (bool, error) {
if pod.Spec.NodeName == "" {
return false, nil
}
return true, nil
return WaitForPods(ctx, c, ns, opts, Range{MinFound: 1, AllMatching: true}, podScheduledBeforeTimeout, "be scheduled", func(pod *v1.Pod) bool {
return pod.Spec.NodeName != ""
})
}
// WaitForPodsWithLabel waits up to podListTimeout for getting pods with certain label
func WaitForPodsWithLabel(ctx context.Context, c clientset.Interface, ns string, label labels.Selector) (*v1.PodList, error) {
opts := metav1.ListOptions{LabelSelector: label.String()}
return WaitForAllPodsCondition(ctx, c, ns, opts, 1, "existent", podListTimeout, func(pod *v1.Pod) (bool, error) {
return true, nil
return WaitForPods(ctx, c, ns, opts, Range{MinFound: 1}, podListTimeout, "exist", func(pod *v1.Pod) bool {
return true
})
}
@ -652,7 +665,7 @@ func WaitForPodsWithLabel(ctx context.Context, c clientset.Interface, ns string,
// Return the list of matching pods.
func WaitForPodsWithLabelRunningReady(ctx context.Context, c clientset.Interface, ns string, label labels.Selector, num int, timeout time.Duration) (pods *v1.PodList, err error) {
opts := metav1.ListOptions{LabelSelector: label.String()}
return WaitForAllPodsCondition(ctx, c, ns, opts, 1, "running and ready", timeout, testutils.PodRunningReady)
return WaitForPods(ctx, c, ns, opts, Range{MinFound: num, AllMatching: true}, timeout, "be running and ready", RunningReady)
}
// WaitForNRestartablePods tries to list restarting pods using ps until it finds expect of them,

View File

@ -35,7 +35,6 @@ import (
"k8s.io/kubernetes/test/e2e/storage/drivers"
storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
"k8s.io/kubernetes/test/e2e/storage/utils"
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
admissionapi "k8s.io/pod-security-admission/api"
)
@ -119,7 +118,7 @@ var _ = utils.SIGDescribe("[Feature:NodeOutOfServiceVolumeDetach] [Disruptive] [
LabelSelector: labelSelectorStr,
FieldSelector: fields.OneTermNotEqualSelector("spec.nodeName", oldNodeName).String(),
}
_, err = e2epod.WaitForAllPodsCondition(ctx, c, ns, podListOpts, 1, "running and ready", framework.PodStartTimeout, testutils.PodRunningReady)
_, err = e2epod.WaitForPods(ctx, c, ns, podListOpts, e2epod.Range{MinMatching: 1}, framework.PodStartTimeout, "be running and ready", e2epod.RunningReady)
framework.ExpectNoError(err)
// Bring the node back online and remove the taint