e2e pod: convert ProxyResponseChecker into matcher

Instead of pod responses being printed to the log each time polling fails, we
get a consolidated failure message with all unexpected pod responses if (and
only if) the check times out or a progress report gets produced.
This commit is contained in:
Patrick Ohly 2023-01-20 14:38:25 +01:00
parent c3266cde77
commit 1e346c4e4a
4 changed files with 92 additions and 101 deletions

View File

@ -493,9 +493,7 @@ var _ = SIGDescribe("ServiceAccounts", func() {
framework.ExpectNoError(err)
framework.Logf("created pod")
if !e2epod.CheckPodsRunningReady(ctx, f.ClientSet, f.Namespace.Name, []string{pod.Name}, time.Minute) {
framework.Failf("pod %q in ns %q never became ready", pod.Name, f.Namespace.Name)
}
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, time.Minute))
framework.Logf("pod is ready")

View File

@ -30,7 +30,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
@ -60,95 +59,6 @@ func expectNoErrorWithOffset(offset int, err error, explain ...interface{}) {
gomega.ExpectWithOffset(1+offset, err).NotTo(gomega.HaveOccurred(), explain...)
}
func isElementOf(podUID types.UID, pods *v1.PodList) bool {
for _, pod := range pods.Items {
if pod.UID == podUID {
return true
}
}
return false
}
// ProxyResponseChecker is a context for checking pods responses by issuing GETs to them (via the API
// proxy) and verifying that they answer with their own pod name.
type ProxyResponseChecker struct {
c clientset.Interface
ns string
label labels.Selector
controllerName string
respondName bool // Whether the pod should respond with its own name.
pods *v1.PodList
}
// NewProxyResponseChecker returns a context for checking pods responses.
func NewProxyResponseChecker(c clientset.Interface, ns string, label labels.Selector, controllerName string, respondName bool, pods *v1.PodList) ProxyResponseChecker {
return ProxyResponseChecker{c, ns, label, controllerName, respondName, pods}
}
// CheckAllResponses issues GETs to all pods in the context and verify they
// reply with their own pod name.
func (r ProxyResponseChecker) CheckAllResponses(ctx context.Context) (done bool, err error) {
successes := 0
options := metav1.ListOptions{LabelSelector: r.label.String()}
currentPods, err := r.c.CoreV1().Pods(r.ns).List(ctx, options)
expectNoError(err, "Failed to get list of currentPods in namespace: %s", r.ns)
for i, pod := range r.pods.Items {
// Check that the replica list remains unchanged, otherwise we have problems.
if !isElementOf(pod.UID, currentPods) {
return false, fmt.Errorf("pod with UID %s is no longer a member of the replica set. Must have been restarted for some reason. Current replica set: %v", pod.UID, currentPods)
}
ctxUntil, cancel := context.WithTimeout(ctx, singleCallTimeout)
defer cancel()
body, err := r.c.CoreV1().RESTClient().Get().
Namespace(r.ns).
Resource("pods").
SubResource("proxy").
Name(string(pod.Name)).
Do(ctxUntil).
Raw()
if err != nil {
if ctxUntil.Err() != nil {
// We may encounter errors here because of a race between the pod readiness and apiserver
// proxy. So, we log the error and retry if this occurs.
framework.Logf("Controller %s: Failed to Get from replica %d [%s]: %v\n pod status: %#v", r.controllerName, i+1, pod.Name, err, pod.Status)
return false, nil
}
framework.Logf("Controller %s: Failed to GET from replica %d [%s]: %v\npod status: %#v", r.controllerName, i+1, pod.Name, err, pod.Status)
continue
}
// The response checker expects the pod's name unless !respondName, in
// which case it just checks for a non-empty response.
got := string(body)
what := ""
if r.respondName {
what = "expected"
want := pod.Name
if got != want {
framework.Logf("Controller %s: Replica %d [%s] expected response %q but got %q",
r.controllerName, i+1, pod.Name, want, got)
continue
}
} else {
what = "non-empty"
if len(got) == 0 {
framework.Logf("Controller %s: Replica %d [%s] expected non-empty response",
r.controllerName, i+1, pod.Name)
continue
}
}
successes++
framework.Logf("Controller %s: Got %s result from replica %d [%s]: %q, %d of %d required successes so far",
r.controllerName, what, i+1, pod.Name, got, successes, len(r.pods.Items))
}
if successes < len(r.pods.Items) {
return false, nil
}
return true, nil
}
// PodsCreated returns a pod list matched by the given name.
func PodsCreated(ctx context.Context, c clientset.Interface, ns, name string, replicas int32) (*v1.PodList, error) {
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))

View File

@ -34,6 +34,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/util/podutils"
@ -588,9 +589,93 @@ func WaitForPodsResponding(ctx context.Context, c clientset.Interface, ns string
}
ginkgo.By("trying to dial each unique pod")
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": controllerName}))
options := metav1.ListOptions{LabelSelector: label.String()}
err := wait.PollImmediateWithContext(ctx, framework.PollInterval(), timeout, NewProxyResponseChecker(c, ns, label, controllerName, wantName, pods).CheckAllResponses)
return maybeTimeoutError(err, "waiting for pods to be responsive")
type response struct {
podName string
response string
}
get := func(ctx context.Context) ([]response, error) {
currentPods, err := c.CoreV1().Pods(ns).List(ctx, options)
if err != nil {
return nil, fmt.Errorf("list pods: %w", err)
}
var responses []response
for _, pod := range pods.Items {
// Check that the replica list remains unchanged, otherwise we have problems.
if !isElementOf(pod.UID, currentPods) {
return nil, gomega.StopTrying(fmt.Sprintf("Pod with UID %s is no longer a member of the replica set. Must have been restarted for some reason.\nCurrent replica set:\n%s", pod.UID, format.Object(currentPods, 1)))
}
ctxUntil, cancel := context.WithTimeout(ctx, singleCallTimeout)
defer cancel()
body, err := c.CoreV1().RESTClient().Get().
Namespace(ns).
Resource("pods").
SubResource("proxy").
Name(string(pod.Name)).
Do(ctxUntil).
Raw()
if err != nil {
// We may encounter errors here because of a race between the pod readiness and apiserver
// proxy. So, we log the error and retry if this occurs.
return nil, fmt.Errorf("Controller %s: failed to Get from replica pod %s:\n%s\nPod status:\n%s",
controllerName, pod.Name,
format.Object(err, 1), format.Object(pod.Status, 1))
}
responses = append(responses, response{podName: pod.Name, response: string(body)})
}
return responses, nil
}
match := func(responses []response) (func() string, error) {
// The response checker expects the pod's name unless !respondName, in
// which case it just checks for a non-empty response.
var unexpected []response
for _, response := range responses {
if wantName {
if response.response != response.podName {
unexpected = append(unexpected, response)
}
} else {
if len(response.response) == 0 {
unexpected = append(unexpected, response)
}
}
}
if len(unexpected) > 0 {
return func() string {
what := "some response"
if wantName {
what = "the pod's own name as response"
}
return fmt.Sprintf("Wanted %s, but the following pods replied with something else:\n%s", what, format.Object(unexpected, 1))
}, nil
}
return nil, nil
}
err := framework.Gomega().
Eventually(ctx, framework.HandleRetry(get)).
WithTimeout(timeout).
Should(framework.MakeMatcher(match))
if err != nil {
return fmt.Errorf("checking pod responses: %w", err)
}
return nil
}
func isElementOf(podUID apitypes.UID, pods *v1.PodList) bool {
for _, pod := range pods.Items {
if pod.UID == podUID {
return true
}
}
return false
}
// WaitForNumberOfPods waits up to timeout to ensure there are exact

View File

@ -411,7 +411,7 @@ var _ = SIGDescribe("Kubectl client", func() {
ginkgo.By(fmt.Sprintf("creating the pod from %v", podYaml))
podYaml = commonutils.SubstituteImageName(string(readTestFileOrDie("pod-with-readiness-probe.yaml.in")))
e2ekubectl.RunKubectlOrDieInput(ns, podYaml, "create", "-f", "-")
framework.ExpectEqual(e2epod.CheckPodsRunningReady(ctx, c, ns, []string{simplePodName}, framework.PodStartTimeout), true)
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, c, simplePodName, ns, framework.PodStartTimeout))
})
ginkgo.AfterEach(func() {
cleanupKubectlInputs(podYaml, ns, simplePodSelector)
@ -798,9 +798,7 @@ metadata:
g := func(pods []*v1.Pod) sort.Interface { return sort.Reverse(controller.ActivePods(pods)) }
runTestPod, _, err := polymorphichelpers.GetFirstPod(f.ClientSet.CoreV1(), ns, "run=run-test-3", 1*time.Minute, g)
framework.ExpectNoError(err)
if !e2epod.CheckPodsRunningReady(ctx, c, ns, []string{runTestPod.Name}, time.Minute) {
framework.Failf("Pod %q of Job %q should still be running", runTestPod.Name, "run-test-3")
}
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, c, runTestPod.Name, ns, time.Minute))
gomega.Expect(c.CoreV1().Pods(ns).Delete(ctx, "run-test-3", metav1.DeleteOptions{})).To(gomega.BeNil())
})
@ -1500,7 +1498,7 @@ metadata:
ginkgo.By("creating the pod")
podYaml = commonutils.SubstituteImageName(string(readTestFileOrDie("pause-pod.yaml.in")))
e2ekubectl.RunKubectlOrDieInput(ns, podYaml, "create", "-f", "-")
framework.ExpectEqual(e2epod.CheckPodsRunningReady(ctx, c, ns, []string{pausePodName}, framework.PodStartTimeout), true)
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, c, pausePodName, ns, framework.PodStartTimeout))
})
ginkgo.AfterEach(func() {
cleanupKubectlInputs(podYaml, ns, pausePodSelector)
@ -1539,7 +1537,7 @@ metadata:
ginkgo.By("creating the pod")
podYaml = commonutils.SubstituteImageName(string(readTestFileOrDie("busybox-pod.yaml.in")))
e2ekubectl.RunKubectlOrDieInput(ns, podYaml, "create", "-f", "-")
framework.ExpectEqual(e2epod.CheckPodsRunningReady(ctx, c, ns, []string{busyboxPodName}, framework.PodStartTimeout), true)
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, c, busyboxPodName, ns, framework.PodStartTimeout))
})
ginkgo.AfterEach(func() {
cleanupKubectlInputs(podYaml, ns, busyboxPodSelector)