e2e conntrack tests

deflake current e2e test
"should be able to preserve UDP traffic when server pod cycles for a
NodePort service" and reorganize the code in the e2e framework

Signed-off-by: Antonio Ojea <antonio.ojea.garcia@gmail.com>
This commit is contained in:
Antonio Ojea 2020-06-13 13:44:59 +02:00
parent 653eb230f2
commit 27d32661c2
3 changed files with 247 additions and 180 deletions

View File

@ -8,6 +8,7 @@ load(
go_library(
name = "go_default_library",
srcs = [
"conntrack.go",
"dns.go",
"dns_common.go",
"dns_configmap.go",

View File

@ -0,0 +1,246 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package network
import (
"fmt"
"strings"
"time"
"github.com/onsi/ginkgo"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
)
const (
serviceName = "svc-udp"
podClient = "pod-client"
podBackend1 = "pod-server-1"
podBackend2 = "pod-server-2"
srcPort = 12345
)
// Linux NAT uses conntrack to perform NAT, everytime a new
// flow is seen, a connection is created in the conntrack table, and it
// is being used by the NAT module.
// Each entry in the conntrack table has associated a timeout, that removes
// the connection once it expires.
// UDP is a connectionless protocol, so the conntrack module tracking functions
// are not very advanced.
// It uses a short timeout (30 sec by default) that is renewed if there are new flows
// matching the connection. Otherwise it expires the entry.
// This behaviour can cause issues in Kubernetes when one entry on the conntrack table
// is never expired because the sender does not stop sending traffic, but the pods or
// endpoints were deleted, blackholing the traffic
// In order to mitigate this problem, Kubernetes delete the stale entries:
// - when an endpoint is removed
// - when a service goes from no endpoints to new endpoint
// Ref: https://api.semanticscholar.org/CorpusID:198903401
// Boye, Magnus. “Netfilter Connection Tracking and NAT Implementation.” (2012).
var _ = SIGDescribe("Conntrack", func() {
fr := framework.NewDefaultFramework("conntrack")
type nodeInfo struct {
name string
nodeIP string
}
var (
cs clientset.Interface
ns string
clientNodeInfo, serverNodeInfo nodeInfo
)
ginkgo.BeforeEach(func() {
cs = fr.ClientSet
ns = fr.Namespace.Name
nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2)
framework.ExpectNoError(err)
if len(nodes.Items) < 2 {
e2eskipper.Skipf(
"Test requires >= 2 Ready nodes, but there are only %v nodes",
len(nodes.Items))
}
ips := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)
clientNodeInfo = nodeInfo{
name: nodes.Items[0].Name,
nodeIP: ips[0],
}
serverNodeInfo = nodeInfo{
name: nodes.Items[1].Name,
nodeIP: ips[1],
}
})
ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a NodePort service", func() {
// Create a NodePort service
udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
ginkgo.By("creating a UDP service " + serviceName + " with type=NodePort in " + ns)
udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeNodePort
svc.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)},
}
})
framework.ExpectNoError(err)
// Create a pod in one node to create the UDP traffic against the NodePort service every 5 seconds
ginkgo.By("creating a client pod for probing the service " + serviceName)
clientPod := newAgnhostPod(podClient, "")
clientPod.Spec.NodeName = clientNodeInfo.name
cmd := fmt.Sprintf(`date; for i in $(seq 1 300); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, serverNodeInfo.nodeIP, udpService.Spec.Ports[0].NodePort)
clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
clientPod.Spec.Containers[0].Name = podClient
fr.PodClient().CreateSync(clientPod)
// Read the client pod logs
logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)
// Add a backend pod to the service in the other node
ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
serverPod1 := newAgnhostPod(podBackend1, "netexec", fmt.Sprintf("--udp-port=%d", 80))
serverPod1.Labels = udpJig.Labels
serverPod1.Spec.NodeName = serverNodeInfo.name
fr.PodClient().CreateSync(serverPod1)
// Waiting for service to expose endpoint.
err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{podBackend1: {80}})
framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns)
// Check that the pod receives the traffic
// UDP conntrack entries timeout is 30 sec by default
ginkgo.By("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo.nodeIP)
time.Sleep(30 * time.Second)
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)
if !strings.Contains(string(logs), podBackend1) {
framework.Failf("Failed to connecto to backend 1")
}
// Create a second pod
ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName)
serverPod2 := newAgnhostPod(podBackend2, "netexec", fmt.Sprintf("--udp-port=%d", 80))
serverPod2.Labels = udpJig.Labels
serverPod2.Spec.NodeName = serverNodeInfo.name
fr.PodClient().CreateSync(serverPod2)
// and delete the first pod
framework.Logf("Cleaning up %s pod", podBackend1)
fr.PodClient().DeleteSync(podBackend1, metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout)
// Check that the second pod keeps receiving traffic
// UDP conntrack entries timeout is 30 sec by default
ginkgo.By("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo.nodeIP)
time.Sleep(30 * time.Second)
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)
if !strings.Contains(string(logs), podBackend2) {
framework.Failf("Failed to connecto to backend 2")
}
})
ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a ClusterIP service", func() {
// Create a NodePort service
udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in " + ns)
udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)},
}
})
framework.ExpectNoError(err)
// Create a pod in one node to create the UDP traffic against the NodePort service every 5 seconds
ginkgo.By("creating a client pod for probing the service " + serviceName)
clientPod := newAgnhostPod(podClient, "")
clientPod.Spec.NodeName = clientNodeInfo.name
cmd := fmt.Sprintf(`date; for i in $(seq 1 300); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, udpService.Spec.ClusterIP, udpService.Spec.Ports[0].Port)
clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
clientPod.Spec.Containers[0].Name = podClient
fr.PodClient().CreateSync(clientPod)
// Read the client pod logs
logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)
// Add a backend pod to the service in the other node
ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
serverPod1 := newAgnhostPod(podBackend1, "netexec", fmt.Sprintf("--udp-port=%d", 80))
serverPod1.Labels = udpJig.Labels
serverPod1.Spec.NodeName = serverNodeInfo.name
fr.PodClient().CreateSync(serverPod1)
// Waiting for service to expose endpoint.
err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{podBackend1: {80}})
framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns)
// Check that the pod receives the traffic
// UDP conntrack entries timeout is 30 sec by default
ginkgo.By("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo.nodeIP)
time.Sleep(30 * time.Second)
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)
if !strings.Contains(string(logs), podBackend1) {
framework.Failf("Failed to connecto to backend 1")
}
// Create a second pod
ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName)
serverPod2 := newAgnhostPod(podBackend2, "netexec", fmt.Sprintf("--udp-port=%d", 80))
serverPod2.Labels = udpJig.Labels
serverPod2.Spec.NodeName = serverNodeInfo.name
fr.PodClient().CreateSync(serverPod2)
// and delete the first pod
framework.Logf("Cleaning up %s pod", podBackend1)
fr.PodClient().DeleteSync(podBackend1, metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout)
// Check that the second pod keeps receiving traffic
// UDP conntrack entries timeout is 30 sec by default
ginkgo.By("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo.nodeIP)
time.Sleep(30 * time.Second)
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)
if !strings.Contains(string(logs), podBackend2) {
framework.Failf("Failed to connecto to backend 2")
}
})
})

View File

@ -518,123 +518,6 @@ func pokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPo
return ret
}
// continuousEcho() uses the same connection for multiple requests, made to run as a goroutine so that
// manipulations can be made to the service and backend pods while a connection is ongoing
// it starts by sending a series of packets to establish conntrack entries and waits for a signal to keep
// sending packts. It returns an error if the number of failed attempts is >= 5
func continuousEcho(host string, port int, timeout time.Duration, maxAttempts int, signal chan struct{}, errorChannel chan error) {
defer ginkgo.GinkgoRecover()
const threshold = 10
// Sanity check inputs, because it has happened. These are the only things
// that should hard fail the test - they are basically ASSERT()s.
if host == "" {
errorChannel <- fmt.Errorf("Got empty host for continuous echo (%s)", host)
return
}
if port == 0 {
errorChannel <- fmt.Errorf("Got port ==0 for continuous echo (%d)", port)
return
}
hostPort := net.JoinHostPort(host, strconv.Itoa(port))
url := fmt.Sprintf("udp://%s", hostPort)
ret := UDPPokeResult{}
con, err := net.Dial("udp", hostPort)
if err != nil {
ret.Status = UDPError
ret.Error = err
errorChannel <- fmt.Errorf("Connection to %q failed: %v", url, err)
return
}
numErrors := 0
bufsize := len(strconv.Itoa(maxAttempts)) + 1
var buf = make([]byte, bufsize)
for i := 0; i < maxAttempts; i++ {
if i == threshold {
framework.Logf("Continuous echo waiting for signal to continue")
<-signal
if numErrors == threshold {
errorChannel <- fmt.Errorf("continuous echo was not able to communicate with initial server pod")
return
}
}
time.Sleep(1 * time.Second)
err = con.SetDeadline(time.Now().Add(timeout))
if err != nil {
ret.Status = UDPError
ret.Error = err
framework.Logf("Continuous echo (%q): %v", url, err)
numErrors++
continue
}
myRequest := fmt.Sprintf("echo %d", i)
_, err = con.Write([]byte(fmt.Sprintf("%s\n", myRequest)))
if err != nil {
ret.Error = err
neterr, ok := err.(net.Error)
if ok && neterr.Timeout() {
ret.Status = UDPTimeout
} else if strings.Contains(err.Error(), "connection refused") {
ret.Status = UDPRefused
} else {
ret.Status = UDPError
}
numErrors++
framework.Logf("Continuous echo (%q): %v - %d errors seen so far", url, err, numErrors)
continue
}
err = con.SetDeadline(time.Now().Add(timeout))
if err != nil {
ret.Status = UDPError
ret.Error = err
numErrors++
framework.Logf("Continuous echo (%q): %v - %d errors seen so far", url, err, numErrors)
continue
}
n, err := con.Read(buf)
if err != nil {
ret.Error = err
neterr, ok := err.(net.Error)
if ok && neterr.Timeout() {
ret.Status = UDPTimeout
} else if strings.Contains(err.Error(), "connection refused") {
ret.Status = UDPRefused
} else {
ret.Status = UDPError
}
numErrors++
framework.Logf("Continuous echo (%q): %v - %d errors seen so far", url, err, numErrors)
continue
}
ret.Response = buf[0:n]
if string(ret.Response) != fmt.Sprintf("%d", i) {
ret.Status = UDPBadResponse
ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response))
framework.Logf("Continuous echo (%q): %v", url, ret.Error)
numErrors++
continue
}
ret.Status = UDPSuccess
framework.Logf("Continuous echo(%q): success", url)
}
err = nil
if numErrors >= threshold {
err = fmt.Errorf("Too many Errors in continuous echo")
}
errorChannel <- err
}
// testReachableUDP tests that the given host serves UDP on the given port.
func testReachableUDP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
@ -1240,69 +1123,6 @@ var _ = SIGDescribe("Services", func() {
framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort))
})
ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a NodePort service", func() {
serviceName := "clusterip-test"
serverPod1Name := "server-1"
serverPod2Name := "server-2"
ns := f.Namespace.Name
nodeIP, err := e2enode.PickIP(cs) // for later
framework.ExpectNoError(err)
// Create a NodePort service
udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
ginkgo.By("creating a UDP service " + serviceName + " with type=NodePort in " + ns)
udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeNodePort
svc.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)},
}
})
framework.ExpectNoError(err)
// Add a backend pod to the service
ginkgo.By("creating a backend pod for the service " + serviceName)
serverPod1 := newAgnhostPod(serverPod1Name, "netexec", fmt.Sprintf("--udp-port=%d", 80))
serverPod1.Labels = udpJig.Labels
_, err = cs.CoreV1().Pods(ns).Create(context.TODO(), serverPod1, metav1.CreateOptions{})
ginkgo.By(fmt.Sprintf("checking NodePort service %s on node with public IP %s", serviceName, nodeIP))
framework.ExpectNoError(err)
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, serverPod1.Name, f.Namespace.Name, framework.PodStartTimeout))
// Waiting for service to expose endpoint.
err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{serverPod1Name: {80}})
framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns)
// Check that the pod reveives the traffic
ginkgo.By("Sending UDP traffic to NodePort service " + serviceName + " on node with publicIP " + nodeIP)
errorChannel := make(chan error)
signal := make(chan struct{}, 1)
go continuousEcho(nodeIP, int(udpService.Spec.Ports[0].NodePort), 3*time.Second, 20, signal, errorChannel)
// Create a second pod
ginkgo.By("creating a second pod for the service " + serviceName)
serverPod2 := newAgnhostPod(serverPod2Name, "netexec", fmt.Sprintf("--udp-port=%d", 80))
serverPod2.Labels = udpJig.Labels
_, err = cs.CoreV1().Pods(ns).Create(context.TODO(), serverPod2, metav1.CreateOptions{})
framework.ExpectNoError(err)
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, serverPod2.Name, f.Namespace.Name, framework.PodStartTimeout))
// and delete the first pod
framework.Logf("Cleaning up %s pod", serverPod1Name)
err = cs.CoreV1().Pods(ns).Delete(context.TODO(), serverPod1Name, metav1.DeleteOptions{})
framework.ExpectNoError(err, "failed to delete pod: %s on node", serverPod1Name)
// Check that the second pod keeps receiving traffic
ginkgo.By("Sending UDP traffic to NodePort service " + serviceName + " on node with publicIP " + nodeIP)
signal <- struct{}{}
// Check that there are no errors
err = <-errorChannel
framework.ExpectNoError(err, "pod communication failed")
})
/*
Release : v1.16
Testname: Service, NodePort Service