From f32648d81013ac56ad529eb262a5cf1c18ff2273 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Fri, 19 Jun 2020 15:30:37 +0200 Subject: [PATCH] Attempt to deflake conntract e2e tests --- test/e2e/network/conntrack.go | 89 ++++++++++++++++++++++------------- 1 file changed, 57 insertions(+), 32 deletions(-) diff --git a/test/e2e/network/conntrack.go b/test/e2e/network/conntrack.go index 583b212a143..c0ed1d274b5 100644 --- a/test/e2e/network/conntrack.go +++ b/test/e2e/network/conntrack.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -75,6 +76,20 @@ var _ = SIGDescribe("Conntrack", func() { clientNodeInfo, serverNodeInfo nodeInfo ) + logContainsFn := func(text string) wait.ConditionFunc { + return func() (bool, error) { + logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient) + if err != nil { + // Retry the error next time. + return false, nil + } + if !strings.Contains(string(logs), text) { + return false, nil + } + return true, nil + } + } + ginkgo.BeforeEach(func() { cs = fr.ClientSet ns = fr.Namespace.Name @@ -117,7 +132,7 @@ var _ = SIGDescribe("Conntrack", func() { ginkgo.By("creating a client pod for probing the service " + serviceName) clientPod := newAgnhostPod(podClient, "") clientPod.Spec.NodeName = clientNodeInfo.name - cmd := fmt.Sprintf(`date; for i in $(seq 1 300); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, serverNodeInfo.nodeIP, udpService.Spec.Ports[0].NodePort) + cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, serverNodeInfo.nodeIP, udpService.Spec.Ports[0].NodePort) clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} clientPod.Spec.Containers[0].Name = podClient fr.PodClient().CreateSync(clientPod) @@ -138,15 +153,17 @@ var _ = SIGDescribe("Conntrack", func() { err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{podBackend1: {80}}) framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) - // Check that the pod receives the traffic - // UDP conntrack entries timeout is 30 sec by default + // Note that the fact that Endpoints object already exists, does NOT mean + // that iptables (or whatever else is used) was already programmed. + // Additionally take into account that UDP conntract entries timeout is + // 30 seconds by default. + // Based on the above check if the pod receives the traffic. ginkgo.By("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo.nodeIP) - time.Sleep(30 * time.Second) - logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) - framework.ExpectNoError(err) - framework.Logf("Pod client logs: %s", logs) - if !strings.Contains(string(logs), podBackend1) { - framework.Failf("Failed to connecto to backend 1") + if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend1)); err != nil { + logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + framework.Failf("Failed to connect to backend 1") } // Create a second pod @@ -160,21 +177,24 @@ var _ = SIGDescribe("Conntrack", func() { framework.Logf("Cleaning up %s pod", podBackend1) fr.PodClient().DeleteSync(podBackend1, metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) + // Waiting for service to expose endpoint. + err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{podBackend2: {80}}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) + // Check that the second pod keeps receiving traffic // UDP conntrack entries timeout is 30 sec by default ginkgo.By("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo.nodeIP) - time.Sleep(30 * time.Second) - logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) - framework.ExpectNoError(err) - framework.Logf("Pod client logs: %s", logs) - if !strings.Contains(string(logs), podBackend2) { - framework.Failf("Failed to connecto to backend 2") + if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend2)); err != nil { + logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + framework.Failf("Failed to connect to backend 2") } }) ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a ClusterIP service", func() { - // Create a NodePort service + // Create a ClusterIP service udpJig := e2eservice.NewTestJig(cs, ns, serviceName) ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in " + ns) udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) { @@ -185,11 +205,11 @@ var _ = SIGDescribe("Conntrack", func() { }) framework.ExpectNoError(err) - // Create a pod in one node to create the UDP traffic against the NodePort service every 5 seconds + // Create a pod in one node to create the UDP traffic against the ClusterIP service every 5 seconds ginkgo.By("creating a client pod for probing the service " + serviceName) clientPod := newAgnhostPod(podClient, "") clientPod.Spec.NodeName = clientNodeInfo.name - cmd := fmt.Sprintf(`date; for i in $(seq 1 300); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, udpService.Spec.ClusterIP, udpService.Spec.Ports[0].Port) + cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, udpService.Spec.ClusterIP, udpService.Spec.Ports[0].Port) clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} clientPod.Spec.Containers[0].Name = podClient fr.PodClient().CreateSync(clientPod) @@ -210,15 +230,17 @@ var _ = SIGDescribe("Conntrack", func() { err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{podBackend1: {80}}) framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) - // Check that the pod receives the traffic - // UDP conntrack entries timeout is 30 sec by default + // Note that the fact that Endpoints object already exists, does NOT mean + // that iptables (or whatever else is used) was already programmed. + // Additionally take into account that UDP conntract entries timeout is + // 30 seconds by default. + // Based on the above check if the pod receives the traffic. ginkgo.By("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo.nodeIP) - time.Sleep(30 * time.Second) - logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) - framework.ExpectNoError(err) - framework.Logf("Pod client logs: %s", logs) - if !strings.Contains(string(logs), podBackend1) { - framework.Failf("Failed to connecto to backend 1") + if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend1)); err != nil { + logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + framework.Failf("Failed to connect to backend 1") } // Create a second pod @@ -232,15 +254,18 @@ var _ = SIGDescribe("Conntrack", func() { framework.Logf("Cleaning up %s pod", podBackend1) fr.PodClient().DeleteSync(podBackend1, metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) + // Waiting for service to expose endpoint. + err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{podBackend2: {80}}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) + // Check that the second pod keeps receiving traffic // UDP conntrack entries timeout is 30 sec by default ginkgo.By("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo.nodeIP) - time.Sleep(30 * time.Second) - logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) - framework.ExpectNoError(err) - framework.Logf("Pod client logs: %s", logs) - if !strings.Contains(string(logs), podBackend2) { - framework.Failf("Failed to connecto to backend 2") + if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend2)); err != nil { + logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + framework.Failf("Failed to connect to backend 2") } }) })