diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 949d9331967..c7b09934ca1 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -518,6 +518,123 @@ func pokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPo return ret } +// continuousEcho() uses the same connection for multiple requests, made to run as a goroutine so that +// manipulations can be made to the service and backend pods while a connection is ongoing +// it starts by sending a series of packets to establish conntrack entries and waits for a signal to keep +// sending packts. It returns an error if the number of failed attempts is >= 5 +func continuousEcho(host string, port int, timeout time.Duration, maxAttempts int, signal chan struct{}, errorChannel chan error) { + defer ginkgo.GinkgoRecover() + const threshold = 10 + + // Sanity check inputs, because it has happened. These are the only things + // that should hard fail the test - they are basically ASSERT()s. + if host == "" { + errorChannel <- fmt.Errorf("Got empty host for continuous echo (%s)", host) + return + } + if port == 0 { + errorChannel <- fmt.Errorf("Got port ==0 for continuous echo (%d)", port) + return + } + + hostPort := net.JoinHostPort(host, strconv.Itoa(port)) + url := fmt.Sprintf("udp://%s", hostPort) + + ret := UDPPokeResult{} + + con, err := net.Dial("udp", hostPort) + if err != nil { + ret.Status = UDPError + ret.Error = err + errorChannel <- fmt.Errorf("Connection to %q failed: %v", url, err) + return + } + + numErrors := 0 + bufsize := len(strconv.Itoa(maxAttempts)) + 1 + var buf = make([]byte, bufsize) + + for i := 0; i < maxAttempts; i++ { + if i == threshold { + framework.Logf("Continuous echo waiting for signal to continue") + <-signal + if numErrors == threshold { + errorChannel <- fmt.Errorf("continuous echo was not able to communicate with initial server pod") + return + } + } + time.Sleep(1 * time.Second) + err = con.SetDeadline(time.Now().Add(timeout)) + if err != nil { + ret.Status = UDPError + ret.Error = err + framework.Logf("Continuous echo (%q): %v", url, err) + numErrors++ + continue + } + myRequest := fmt.Sprintf("echo %d", i) + _, err = con.Write([]byte(fmt.Sprintf("%s\n", myRequest))) + if err != nil { + ret.Error = err + neterr, ok := err.(net.Error) + if ok && neterr.Timeout() { + ret.Status = UDPTimeout + } else if strings.Contains(err.Error(), "connection refused") { + ret.Status = UDPRefused + } else { + ret.Status = UDPError + } + numErrors++ + framework.Logf("Continuous echo (%q): %v - %d errors seen so far", url, err, numErrors) + continue + } + + err = con.SetDeadline(time.Now().Add(timeout)) + if err != nil { + ret.Status = UDPError + ret.Error = err + numErrors++ + framework.Logf("Continuous echo (%q): %v - %d errors seen so far", url, err, numErrors) + continue + } + + n, err := con.Read(buf) + if err != nil { + ret.Error = err + neterr, ok := err.(net.Error) + if ok && neterr.Timeout() { + ret.Status = UDPTimeout + } else if strings.Contains(err.Error(), "connection refused") { + ret.Status = UDPRefused + } else { + ret.Status = UDPError + } + numErrors++ + framework.Logf("Continuous echo (%q): %v - %d errors seen so far", url, err, numErrors) + continue + } + ret.Response = buf[0:n] + + if string(ret.Response) != fmt.Sprintf("%d", i) { + ret.Status = UDPBadResponse + ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response)) + framework.Logf("Continuous echo (%q): %v", url, ret.Error) + numErrors++ + continue + + } + ret.Status = UDPSuccess + framework.Logf("Continuous echo(%q): success", url) + } + + err = nil + if numErrors >= threshold { + err = fmt.Errorf("Too many Errors in continuous echo") + } + + errorChannel <- err +} + // testReachableUDP tests that the given host serves UDP on the given port. func testReachableUDP(host string, port int, timeout time.Duration) { pollfn := func() (bool, error) { @@ -1123,6 +1240,69 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) }) + ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a NodePort service", func() { + serviceName := "clusterip-test" + serverPod1Name := "server-1" + serverPod2Name := "server-2" + + ns := f.Namespace.Name + + nodeIP, err := e2enode.PickIP(cs) // for later + framework.ExpectNoError(err) + + // Create a NodePort service + udpJig := e2eservice.NewTestJig(cs, ns, serviceName) + ginkgo.By("creating a UDP service " + serviceName + " with type=NodePort in " + ns) + udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)}, + } + }) + framework.ExpectNoError(err) + + // Add a backend pod to the service + ginkgo.By("creating a backend pod for the service " + serviceName) + serverPod1 := newAgnhostPod(serverPod1Name, "netexec", fmt.Sprintf("--udp-port=%d", 80)) + serverPod1.Labels = udpJig.Labels + _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), serverPod1, metav1.CreateOptions{}) + ginkgo.By(fmt.Sprintf("checking NodePort service %s on node with public IP %s", serviceName, nodeIP)) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, serverPod1.Name, f.Namespace.Name, framework.PodStartTimeout)) + + // Waiting for service to expose endpoint. + err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{serverPod1Name: {80}}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) + + // Check that the pod reveives the traffic + ginkgo.By("Sending UDP traffic to NodePort service " + serviceName + " on node with publicIP " + nodeIP) + errorChannel := make(chan error) + signal := make(chan struct{}, 1) + go continuousEcho(nodeIP, int(udpService.Spec.Ports[0].NodePort), 3*time.Second, 20, signal, errorChannel) + + // Create a second pod + ginkgo.By("creating a second pod for the service " + serviceName) + serverPod2 := newAgnhostPod(serverPod2Name, "netexec", fmt.Sprintf("--udp-port=%d", 80)) + serverPod2.Labels = udpJig.Labels + _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), serverPod2, metav1.CreateOptions{}) + framework.ExpectNoError(err) + framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, serverPod2.Name, f.Namespace.Name, framework.PodStartTimeout)) + + // and delete the first pod + framework.Logf("Cleaning up %s pod", serverPod1Name) + err = cs.CoreV1().Pods(ns).Delete(context.TODO(), serverPod1Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err, "failed to delete pod: %s on node", serverPod1Name) + + // Check that the second pod keeps receiving traffic + ginkgo.By("Sending UDP traffic to NodePort service " + serviceName + " on node with publicIP " + nodeIP) + signal <- struct{}{} + + // Check that there are no errors + err = <-errorChannel + framework.ExpectNoError(err, "pod communication failed") + + }) + /* Release : v1.16 Testname: Service, NodePort Service