mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Add a test that checks if conntrack entries are cleaned up for UDP traffic
ensure that when a pod servicing UDP traffic is deleted the conntrack entries are cleaned up and another backend can pick up the traffic with minimal interruption When using NodePort services and long running connections that on pod deletion stale conntrack entries can halt the flow of traffic. Add a test case to check that conntrack entries are cleaned up.
This commit is contained in:
parent
9380cb1b13
commit
59082e80e3
@ -518,6 +518,123 @@ func pokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPo
|
||||
return ret
|
||||
}
|
||||
|
||||
// continuousEcho() uses the same connection for multiple requests, made to run as a goroutine so that
|
||||
// manipulations can be made to the service and backend pods while a connection is ongoing
|
||||
// it starts by sending a series of packets to establish conntrack entries and waits for a signal to keep
|
||||
// sending packts. It returns an error if the number of failed attempts is >= 5
|
||||
func continuousEcho(host string, port int, timeout time.Duration, maxAttempts int, signal chan struct{}, errorChannel chan error) {
|
||||
defer ginkgo.GinkgoRecover()
|
||||
const threshold = 10
|
||||
|
||||
// Sanity check inputs, because it has happened. These are the only things
|
||||
// that should hard fail the test - they are basically ASSERT()s.
|
||||
if host == "" {
|
||||
errorChannel <- fmt.Errorf("Got empty host for continuous echo (%s)", host)
|
||||
return
|
||||
}
|
||||
if port == 0 {
|
||||
errorChannel <- fmt.Errorf("Got port ==0 for continuous echo (%d)", port)
|
||||
return
|
||||
}
|
||||
|
||||
hostPort := net.JoinHostPort(host, strconv.Itoa(port))
|
||||
url := fmt.Sprintf("udp://%s", hostPort)
|
||||
|
||||
ret := UDPPokeResult{}
|
||||
|
||||
con, err := net.Dial("udp", hostPort)
|
||||
if err != nil {
|
||||
ret.Status = UDPError
|
||||
ret.Error = err
|
||||
errorChannel <- fmt.Errorf("Connection to %q failed: %v", url, err)
|
||||
return
|
||||
}
|
||||
|
||||
numErrors := 0
|
||||
bufsize := len(strconv.Itoa(maxAttempts)) + 1
|
||||
var buf = make([]byte, bufsize)
|
||||
|
||||
for i := 0; i < maxAttempts; i++ {
|
||||
if i == threshold {
|
||||
framework.Logf("Continuous echo waiting for signal to continue")
|
||||
<-signal
|
||||
if numErrors == threshold {
|
||||
errorChannel <- fmt.Errorf("continuous echo was not able to communicate with initial server pod")
|
||||
return
|
||||
}
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
err = con.SetDeadline(time.Now().Add(timeout))
|
||||
if err != nil {
|
||||
ret.Status = UDPError
|
||||
ret.Error = err
|
||||
framework.Logf("Continuous echo (%q): %v", url, err)
|
||||
numErrors++
|
||||
continue
|
||||
}
|
||||
myRequest := fmt.Sprintf("echo %d", i)
|
||||
_, err = con.Write([]byte(fmt.Sprintf("%s\n", myRequest)))
|
||||
if err != nil {
|
||||
ret.Error = err
|
||||
neterr, ok := err.(net.Error)
|
||||
if ok && neterr.Timeout() {
|
||||
ret.Status = UDPTimeout
|
||||
} else if strings.Contains(err.Error(), "connection refused") {
|
||||
ret.Status = UDPRefused
|
||||
} else {
|
||||
ret.Status = UDPError
|
||||
}
|
||||
numErrors++
|
||||
framework.Logf("Continuous echo (%q): %v - %d errors seen so far", url, err, numErrors)
|
||||
continue
|
||||
}
|
||||
|
||||
err = con.SetDeadline(time.Now().Add(timeout))
|
||||
if err != nil {
|
||||
ret.Status = UDPError
|
||||
ret.Error = err
|
||||
numErrors++
|
||||
framework.Logf("Continuous echo (%q): %v - %d errors seen so far", url, err, numErrors)
|
||||
continue
|
||||
}
|
||||
|
||||
n, err := con.Read(buf)
|
||||
if err != nil {
|
||||
ret.Error = err
|
||||
neterr, ok := err.(net.Error)
|
||||
if ok && neterr.Timeout() {
|
||||
ret.Status = UDPTimeout
|
||||
} else if strings.Contains(err.Error(), "connection refused") {
|
||||
ret.Status = UDPRefused
|
||||
} else {
|
||||
ret.Status = UDPError
|
||||
}
|
||||
numErrors++
|
||||
framework.Logf("Continuous echo (%q): %v - %d errors seen so far", url, err, numErrors)
|
||||
continue
|
||||
}
|
||||
ret.Response = buf[0:n]
|
||||
|
||||
if string(ret.Response) != fmt.Sprintf("%d", i) {
|
||||
ret.Status = UDPBadResponse
|
||||
ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response))
|
||||
framework.Logf("Continuous echo (%q): %v", url, ret.Error)
|
||||
numErrors++
|
||||
continue
|
||||
|
||||
}
|
||||
ret.Status = UDPSuccess
|
||||
framework.Logf("Continuous echo(%q): success", url)
|
||||
}
|
||||
|
||||
err = nil
|
||||
if numErrors >= threshold {
|
||||
err = fmt.Errorf("Too many Errors in continuous echo")
|
||||
}
|
||||
|
||||
errorChannel <- err
|
||||
}
|
||||
|
||||
// testReachableUDP tests that the given host serves UDP on the given port.
|
||||
func testReachableUDP(host string, port int, timeout time.Duration) {
|
||||
pollfn := func() (bool, error) {
|
||||
@ -1123,6 +1240,69 @@ var _ = SIGDescribe("Services", func() {
|
||||
framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, host, podNames2, svc2IP, servicePort))
|
||||
})
|
||||
|
||||
ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a NodePort service", func() {
|
||||
serviceName := "clusterip-test"
|
||||
serverPod1Name := "server-1"
|
||||
serverPod2Name := "server-2"
|
||||
|
||||
ns := f.Namespace.Name
|
||||
|
||||
nodeIP, err := e2enode.PickIP(cs) // for later
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// Create a NodePort service
|
||||
udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
|
||||
ginkgo.By("creating a UDP service " + serviceName + " with type=NodePort in " + ns)
|
||||
udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) {
|
||||
svc.Spec.Type = v1.ServiceTypeNodePort
|
||||
svc.Spec.Ports = []v1.ServicePort{
|
||||
{Port: 80, Name: "http", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)},
|
||||
}
|
||||
})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// Add a backend pod to the service
|
||||
ginkgo.By("creating a backend pod for the service " + serviceName)
|
||||
serverPod1 := newAgnhostPod(serverPod1Name, "netexec", fmt.Sprintf("--udp-port=%d", 80))
|
||||
serverPod1.Labels = udpJig.Labels
|
||||
_, err = cs.CoreV1().Pods(ns).Create(context.TODO(), serverPod1, metav1.CreateOptions{})
|
||||
ginkgo.By(fmt.Sprintf("checking NodePort service %s on node with public IP %s", serviceName, nodeIP))
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, serverPod1.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
|
||||
// Waiting for service to expose endpoint.
|
||||
err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{serverPod1Name: {80}})
|
||||
framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns)
|
||||
|
||||
// Check that the pod reveives the traffic
|
||||
ginkgo.By("Sending UDP traffic to NodePort service " + serviceName + " on node with publicIP " + nodeIP)
|
||||
errorChannel := make(chan error)
|
||||
signal := make(chan struct{}, 1)
|
||||
go continuousEcho(nodeIP, int(udpService.Spec.Ports[0].NodePort), 3*time.Second, 20, signal, errorChannel)
|
||||
|
||||
// Create a second pod
|
||||
ginkgo.By("creating a second pod for the service " + serviceName)
|
||||
serverPod2 := newAgnhostPod(serverPod2Name, "netexec", fmt.Sprintf("--udp-port=%d", 80))
|
||||
serverPod2.Labels = udpJig.Labels
|
||||
_, err = cs.CoreV1().Pods(ns).Create(context.TODO(), serverPod2, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, serverPod2.Name, f.Namespace.Name, framework.PodStartTimeout))
|
||||
|
||||
// and delete the first pod
|
||||
framework.Logf("Cleaning up %s pod", serverPod1Name)
|
||||
err = cs.CoreV1().Pods(ns).Delete(context.TODO(), serverPod1Name, metav1.DeleteOptions{})
|
||||
framework.ExpectNoError(err, "failed to delete pod: %s on node", serverPod1Name)
|
||||
|
||||
// Check that the second pod keeps receiving traffic
|
||||
ginkgo.By("Sending UDP traffic to NodePort service " + serviceName + " on node with publicIP " + nodeIP)
|
||||
signal <- struct{}{}
|
||||
|
||||
// Check that there are no errors
|
||||
err = <-errorChannel
|
||||
framework.ExpectNoError(err, "pod communication failed")
|
||||
|
||||
})
|
||||
|
||||
/*
|
||||
Release : v1.16
|
||||
Testname: Service, NodePort Service
|
||||
|
Loading…
Reference in New Issue
Block a user