diff --git a/test/e2e/network/BUILD b/test/e2e/network/BUILD index 54bbe373cc0..979b274c153 100644 --- a/test/e2e/network/BUILD +++ b/test/e2e/network/BUILD @@ -8,6 +8,7 @@ load( go_library( name = "go_default_library", srcs = [ + "conntrack.go", "dns.go", "dns_common.go", "dns_configmap.go", diff --git a/test/e2e/network/conntrack.go b/test/e2e/network/conntrack.go new file mode 100644 index 00000000000..583b212a143 --- /dev/null +++ b/test/e2e/network/conntrack.go @@ -0,0 +1,246 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package network + +import ( + "fmt" + "strings" + "time" + + "github.com/onsi/ginkgo" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/e2e/framework" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2eservice "k8s.io/kubernetes/test/e2e/framework/service" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" +) + +const ( + serviceName = "svc-udp" + podClient = "pod-client" + podBackend1 = "pod-server-1" + podBackend2 = "pod-server-2" + srcPort = 12345 +) + +// Linux NAT uses conntrack to perform NAT, everytime a new +// flow is seen, a connection is created in the conntrack table, and it +// is being used by the NAT module. +// Each entry in the conntrack table has associated a timeout, that removes +// the connection once it expires. +// UDP is a connectionless protocol, so the conntrack module tracking functions +// are not very advanced. +// It uses a short timeout (30 sec by default) that is renewed if there are new flows +// matching the connection. Otherwise it expires the entry. +// This behaviour can cause issues in Kubernetes when one entry on the conntrack table +// is never expired because the sender does not stop sending traffic, but the pods or +// endpoints were deleted, blackholing the traffic +// In order to mitigate this problem, Kubernetes delete the stale entries: +// - when an endpoint is removed +// - when a service goes from no endpoints to new endpoint + +// Ref: https://api.semanticscholar.org/CorpusID:198903401 +// Boye, Magnus. “Netfilter Connection Tracking and NAT Implementation.” (2012). + +var _ = SIGDescribe("Conntrack", func() { + + fr := framework.NewDefaultFramework("conntrack") + + type nodeInfo struct { + name string + nodeIP string + } + + var ( + cs clientset.Interface + ns string + clientNodeInfo, serverNodeInfo nodeInfo + ) + + ginkgo.BeforeEach(func() { + cs = fr.ClientSet + ns = fr.Namespace.Name + + nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2) + framework.ExpectNoError(err) + if len(nodes.Items) < 2 { + e2eskipper.Skipf( + "Test requires >= 2 Ready nodes, but there are only %v nodes", + len(nodes.Items)) + } + + ips := e2enode.CollectAddresses(nodes, v1.NodeInternalIP) + + clientNodeInfo = nodeInfo{ + name: nodes.Items[0].Name, + nodeIP: ips[0], + } + + serverNodeInfo = nodeInfo{ + name: nodes.Items[1].Name, + nodeIP: ips[1], + } + }) + + ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a NodePort service", func() { + + // Create a NodePort service + udpJig := e2eservice.NewTestJig(cs, ns, serviceName) + ginkgo.By("creating a UDP service " + serviceName + " with type=NodePort in " + ns) + udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)}, + } + }) + framework.ExpectNoError(err) + + // Create a pod in one node to create the UDP traffic against the NodePort service every 5 seconds + ginkgo.By("creating a client pod for probing the service " + serviceName) + clientPod := newAgnhostPod(podClient, "") + clientPod.Spec.NodeName = clientNodeInfo.name + cmd := fmt.Sprintf(`date; for i in $(seq 1 300); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, serverNodeInfo.nodeIP, udpService.Spec.Ports[0].NodePort) + clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} + clientPod.Spec.Containers[0].Name = podClient + fr.PodClient().CreateSync(clientPod) + + // Read the client pod logs + logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + + // Add a backend pod to the service in the other node + ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName) + serverPod1 := newAgnhostPod(podBackend1, "netexec", fmt.Sprintf("--udp-port=%d", 80)) + serverPod1.Labels = udpJig.Labels + serverPod1.Spec.NodeName = serverNodeInfo.name + fr.PodClient().CreateSync(serverPod1) + + // Waiting for service to expose endpoint. + err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{podBackend1: {80}}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) + + // Check that the pod receives the traffic + // UDP conntrack entries timeout is 30 sec by default + ginkgo.By("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo.nodeIP) + time.Sleep(30 * time.Second) + logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + if !strings.Contains(string(logs), podBackend1) { + framework.Failf("Failed to connecto to backend 1") + } + + // Create a second pod + ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName) + serverPod2 := newAgnhostPod(podBackend2, "netexec", fmt.Sprintf("--udp-port=%d", 80)) + serverPod2.Labels = udpJig.Labels + serverPod2.Spec.NodeName = serverNodeInfo.name + fr.PodClient().CreateSync(serverPod2) + + // and delete the first pod + framework.Logf("Cleaning up %s pod", podBackend1) + fr.PodClient().DeleteSync(podBackend1, metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) + + // Check that the second pod keeps receiving traffic + // UDP conntrack entries timeout is 30 sec by default + ginkgo.By("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo.nodeIP) + time.Sleep(30 * time.Second) + logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + if !strings.Contains(string(logs), podBackend2) { + framework.Failf("Failed to connecto to backend 2") + } + }) + + ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a ClusterIP service", func() { + + // Create a NodePort service + udpJig := e2eservice.NewTestJig(cs, ns, serviceName) + ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in " + ns) + udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeClusterIP + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)}, + } + }) + framework.ExpectNoError(err) + + // Create a pod in one node to create the UDP traffic against the NodePort service every 5 seconds + ginkgo.By("creating a client pod for probing the service " + serviceName) + clientPod := newAgnhostPod(podClient, "") + clientPod.Spec.NodeName = clientNodeInfo.name + cmd := fmt.Sprintf(`date; for i in $(seq 1 300); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, udpService.Spec.ClusterIP, udpService.Spec.Ports[0].Port) + clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} + clientPod.Spec.Containers[0].Name = podClient + fr.PodClient().CreateSync(clientPod) + + // Read the client pod logs + logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + + // Add a backend pod to the service in the other node + ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName) + serverPod1 := newAgnhostPod(podBackend1, "netexec", fmt.Sprintf("--udp-port=%d", 80)) + serverPod1.Labels = udpJig.Labels + serverPod1.Spec.NodeName = serverNodeInfo.name + fr.PodClient().CreateSync(serverPod1) + + // Waiting for service to expose endpoint. + err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{podBackend1: {80}}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) + + // Check that the pod receives the traffic + // UDP conntrack entries timeout is 30 sec by default + ginkgo.By("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo.nodeIP) + time.Sleep(30 * time.Second) + logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + if !strings.Contains(string(logs), podBackend1) { + framework.Failf("Failed to connecto to backend 1") + } + + // Create a second pod + ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName) + serverPod2 := newAgnhostPod(podBackend2, "netexec", fmt.Sprintf("--udp-port=%d", 80)) + serverPod2.Labels = udpJig.Labels + serverPod2.Spec.NodeName = serverNodeInfo.name + fr.PodClient().CreateSync(serverPod2) + + // and delete the first pod + framework.Logf("Cleaning up %s pod", podBackend1) + fr.PodClient().DeleteSync(podBackend1, metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) + + // Check that the second pod keeps receiving traffic + // UDP conntrack entries timeout is 30 sec by default + ginkgo.By("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo.nodeIP) + time.Sleep(30 * time.Second) + logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + if !strings.Contains(string(logs), podBackend2) { + framework.Failf("Failed to connecto to backend 2") + } + }) +}) diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 61f3468d89b..51f3dda1dac 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -519,123 +519,6 @@ func pokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPo return ret } -// continuousEcho() uses the same connection for multiple requests, made to run as a goroutine so that -// manipulations can be made to the service and backend pods while a connection is ongoing -// it starts by sending a series of packets to establish conntrack entries and waits for a signal to keep -// sending packts. It returns an error if the number of failed attempts is >= 5 -func continuousEcho(host string, port int, timeout time.Duration, maxAttempts int, signal chan struct{}, errorChannel chan error) { - defer ginkgo.GinkgoRecover() - const threshold = 10 - - // Sanity check inputs, because it has happened. These are the only things - // that should hard fail the test - they are basically ASSERT()s. - if host == "" { - errorChannel <- fmt.Errorf("Got empty host for continuous echo (%s)", host) - return - } - if port == 0 { - errorChannel <- fmt.Errorf("Got port ==0 for continuous echo (%d)", port) - return - } - - hostPort := net.JoinHostPort(host, strconv.Itoa(port)) - url := fmt.Sprintf("udp://%s", hostPort) - - ret := UDPPokeResult{} - - con, err := net.Dial("udp", hostPort) - if err != nil { - ret.Status = UDPError - ret.Error = err - errorChannel <- fmt.Errorf("Connection to %q failed: %v", url, err) - return - } - - numErrors := 0 - bufsize := len(strconv.Itoa(maxAttempts)) + 1 - var buf = make([]byte, bufsize) - - for i := 0; i < maxAttempts; i++ { - if i == threshold { - framework.Logf("Continuous echo waiting for signal to continue") - <-signal - if numErrors == threshold { - errorChannel <- fmt.Errorf("continuous echo was not able to communicate with initial server pod") - return - } - } - time.Sleep(1 * time.Second) - err = con.SetDeadline(time.Now().Add(timeout)) - if err != nil { - ret.Status = UDPError - ret.Error = err - framework.Logf("Continuous echo (%q): %v", url, err) - numErrors++ - continue - } - myRequest := fmt.Sprintf("echo %d", i) - _, err = con.Write([]byte(fmt.Sprintf("%s\n", myRequest))) - if err != nil { - ret.Error = err - neterr, ok := err.(net.Error) - if ok && neterr.Timeout() { - ret.Status = UDPTimeout - } else if strings.Contains(err.Error(), "connection refused") { - ret.Status = UDPRefused - } else { - ret.Status = UDPError - } - numErrors++ - framework.Logf("Continuous echo (%q): %v - %d errors seen so far", url, err, numErrors) - continue - } - - err = con.SetDeadline(time.Now().Add(timeout)) - if err != nil { - ret.Status = UDPError - ret.Error = err - numErrors++ - framework.Logf("Continuous echo (%q): %v - %d errors seen so far", url, err, numErrors) - continue - } - - n, err := con.Read(buf) - if err != nil { - ret.Error = err - neterr, ok := err.(net.Error) - if ok && neterr.Timeout() { - ret.Status = UDPTimeout - } else if strings.Contains(err.Error(), "connection refused") { - ret.Status = UDPRefused - } else { - ret.Status = UDPError - } - numErrors++ - framework.Logf("Continuous echo (%q): %v - %d errors seen so far", url, err, numErrors) - continue - } - ret.Response = buf[0:n] - - if string(ret.Response) != fmt.Sprintf("%d", i) { - ret.Status = UDPBadResponse - ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response)) - framework.Logf("Continuous echo (%q): %v", url, ret.Error) - numErrors++ - continue - - } - ret.Status = UDPSuccess - framework.Logf("Continuous echo(%q): success", url) - } - - err = nil - if numErrors >= threshold { - err = fmt.Errorf("Too many Errors in continuous echo") - } - - errorChannel <- err -} - // testReachableUDP tests that the given host serves UDP on the given port. func testReachableUDP(host string, port int, timeout time.Duration) { pollfn := func() (bool, error) { @@ -1241,69 +1124,6 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort)) }) - ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a NodePort service", func() { - serviceName := "clusterip-test" - serverPod1Name := "server-1" - serverPod2Name := "server-2" - - ns := f.Namespace.Name - - nodeIP, err := e2enode.PickIP(cs) // for later - framework.ExpectNoError(err) - - // Create a NodePort service - udpJig := e2eservice.NewTestJig(cs, ns, serviceName) - ginkgo.By("creating a UDP service " + serviceName + " with type=NodePort in " + ns) - udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) { - svc.Spec.Type = v1.ServiceTypeNodePort - svc.Spec.Ports = []v1.ServicePort{ - {Port: 80, Name: "http", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)}, - } - }) - framework.ExpectNoError(err) - - // Add a backend pod to the service - ginkgo.By("creating a backend pod for the service " + serviceName) - serverPod1 := newAgnhostPod(serverPod1Name, "netexec", fmt.Sprintf("--udp-port=%d", 80)) - serverPod1.Labels = udpJig.Labels - _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), serverPod1, metav1.CreateOptions{}) - ginkgo.By(fmt.Sprintf("checking NodePort service %s on node with public IP %s", serviceName, nodeIP)) - framework.ExpectNoError(err) - framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, serverPod1.Name, f.Namespace.Name, framework.PodStartTimeout)) - - // Waiting for service to expose endpoint. - err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{serverPod1Name: {80}}) - framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) - - // Check that the pod reveives the traffic - ginkgo.By("Sending UDP traffic to NodePort service " + serviceName + " on node with publicIP " + nodeIP) - errorChannel := make(chan error) - signal := make(chan struct{}, 1) - go continuousEcho(nodeIP, int(udpService.Spec.Ports[0].NodePort), 3*time.Second, 20, signal, errorChannel) - - // Create a second pod - ginkgo.By("creating a second pod for the service " + serviceName) - serverPod2 := newAgnhostPod(serverPod2Name, "netexec", fmt.Sprintf("--udp-port=%d", 80)) - serverPod2.Labels = udpJig.Labels - _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), serverPod2, metav1.CreateOptions{}) - framework.ExpectNoError(err) - framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, serverPod2.Name, f.Namespace.Name, framework.PodStartTimeout)) - - // and delete the first pod - framework.Logf("Cleaning up %s pod", serverPod1Name) - err = cs.CoreV1().Pods(ns).Delete(context.TODO(), serverPod1Name, metav1.DeleteOptions{}) - framework.ExpectNoError(err, "failed to delete pod: %s on node", serverPod1Name) - - // Check that the second pod keeps receiving traffic - ginkgo.By("Sending UDP traffic to NodePort service " + serviceName + " on node with publicIP " + nodeIP) - signal <- struct{}{} - - // Check that there are no errors - err = <-errorChannel - framework.ExpectNoError(err, "pod communication failed") - - }) - /* Release : v1.16 Testname: Service, NodePort Service diff --git a/test/e2e/network/util.go b/test/e2e/network/util.go index 13acfd6d38b..ec6048c7b1e 100644 --- a/test/e2e/network/util.go +++ b/test/e2e/network/util.go @@ -64,11 +64,13 @@ func DescribeSvc(ns string) { // newAgnhostPod returns a pod that uses the agnhost image. The image's binary supports various subcommands // that behave the same, no matter the underlying OS. func newAgnhostPod(name string, args ...string) *v1.Pod { + zero := int64(0) return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, Spec: v1.PodSpec{ + TerminationGracePeriodSeconds: &zero, Containers: []v1.Container{ { Name: "agnhost",