diff --git a/test/e2e/network/conntrack.go b/test/e2e/network/conntrack.go index face3f563ad..794ec9617c8 100644 --- a/test/e2e/network/conntrack.go +++ b/test/e2e/network/conntrack.go @@ -17,6 +17,7 @@ limitations under the License. package network import ( + "context" "fmt" "strings" "time" @@ -32,6 +33,7 @@ import ( e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eservice "k8s.io/kubernetes/test/e2e/framework/service" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + imageutils "k8s.io/kubernetes/test/utils/image" ) const ( @@ -76,9 +78,9 @@ var _ = SIGDescribe("Conntrack", func() { clientNodeInfo, serverNodeInfo nodeInfo ) - logContainsFn := func(text string) wait.ConditionFunc { + logContainsFn := func(text, podName string) wait.ConditionFunc { return func() (bool, error) { - logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient) + logs, err := e2epod.GetPodLogs(cs, ns, podName, podName) if err != nil { // Retry the error next time. return false, nil @@ -168,7 +170,7 @@ var _ = SIGDescribe("Conntrack", func() { // 30 seconds by default. // Based on the above check if the pod receives the traffic. ginkgo.By("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo.nodeIP) - if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend1)); err != nil { + if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend1, podClient)); err != nil { logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) framework.ExpectNoError(err) framework.Logf("Pod client logs: %s", logs) @@ -193,7 +195,7 @@ var _ = SIGDescribe("Conntrack", func() { // Check that the second pod keeps receiving traffic // UDP conntrack entries timeout is 30 sec by default ginkgo.By("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo.nodeIP) - if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend2)); err != nil { + if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend2, podClient)); err != nil { logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) framework.ExpectNoError(err) framework.Logf("Pod client logs: %s", logs) @@ -245,7 +247,7 @@ var _ = SIGDescribe("Conntrack", func() { // 30 seconds by default. // Based on the above check if the pod receives the traffic. ginkgo.By("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo.nodeIP) - if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend1)); err != nil { + if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend1, podClient)); err != nil { logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) framework.ExpectNoError(err) framework.Logf("Pod client logs: %s", logs) @@ -270,11 +272,134 @@ var _ = SIGDescribe("Conntrack", func() { // Check that the second pod keeps receiving traffic // UDP conntrack entries timeout is 30 sec by default ginkgo.By("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo.nodeIP) - if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend2)); err != nil { + if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend2, podClient)); err != nil { logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) framework.ExpectNoError(err) framework.Logf("Pod client logs: %s", logs) framework.Failf("Failed to connect to backend 2") } }) + + // Regression test for #74839, where: + // Packets considered INVALID by conntrack are now dropped. In particular, this fixes + // a problem where spurious retransmits in a long-running TCP connection to a service + // IP could result in the connection being closed with the error "Connection reset by + // peer" + // xref: https://kubernetes.io/blog/2019/03/29/kube-proxy-subtleties-debugging-an-intermittent-connection-reset/ + ginkgo.It("should drop INVALID conntrack entries", func() { + serverLabel := map[string]string{ + "app": "boom-server", + } + + serverPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "boom-server", + Labels: serverLabel, + }, + Spec: v1.PodSpec{ + NodeName: serverNodeInfo.name, + Containers: []v1.Container{ + { + Name: "boom-server", + Image: imageutils.GetE2EImage(imageutils.RegressionIssue74839), + Ports: []v1.ContainerPort{ + { + ContainerPort: 9000, // Default port exposed by boom-server + }, + }, + Env: []v1.EnvVar{ + { + Name: "POD_IP", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.podIP", + }, + }, + }, + { + Name: "POD_IPS", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.podIPs", + }, + }, + }, + }, + }, + }, + }, + } + _, err := fr.ClientSet.CoreV1().Pods(fr.Namespace.Name).Create(context.TODO(), serverPod, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + err = e2epod.WaitForPodsRunningReady(fr.ClientSet, fr.Namespace.Name, 1, 0, framework.PodReadyBeforeTimeout, map[string]string{}) + framework.ExpectNoError(err) + + ginkgo.By("Server pod created on node " + serverNodeInfo.name) + + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "boom-server", + }, + Spec: v1.ServiceSpec{ + Selector: serverLabel, + Ports: []v1.ServicePort{ + { + Protocol: v1.ProtocolTCP, + Port: 9000, + }, + }, + }, + } + _, err = fr.ClientSet.CoreV1().Services(fr.Namespace.Name).Create(context.TODO(), svc, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Server service created") + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "startup-script", + }, + Spec: v1.PodSpec{ + NodeName: clientNodeInfo.name, + Containers: []v1.Container{ + { + Name: "startup-script", + Image: imageutils.GetE2EImage(imageutils.BusyBox), + Command: []string{ + "sh", "-c", "while true; do sleep 2; nc boom-server 9000& done", + }, + }, + }, + RestartPolicy: v1.RestartPolicyNever, + }, + } + _, err = fr.ClientSet.CoreV1().Pods(fr.Namespace.Name).Create(context.TODO(), pod, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Client pod created") + + // The client will open connections against the server + // The server will inject invalid packets + // if conntrack does not drop the invalid packets it will go through without NAT + // so the client will receive an unexpected TCP connection and RST the connection + // the server will log ERROR if that happens + ginkgo.By("checking client pod does not RST the TCP connection because it receives and INVALID packet") + if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn("ERROR", "boom-server")); err == nil { + logs, err := e2epod.GetPodLogs(cs, ns, "boom-server", "boom-server") + framework.ExpectNoError(err) + framework.Logf("boom-server pod logs: %s", logs) + framework.Failf("Boom server pod received a RST from the client") + } + + logs, err := e2epod.GetPodLogs(cs, ns, "boom-server", "boom-server") + framework.ExpectNoError(err) + if !strings.Contains(string(logs), "connection established") { + framework.Failf("Boom server pod did not sent any bad packet to the client") + } + framework.Logf("boom-server pod logs: %s", logs) + framework.Logf("boom-server did not receive any RST packet") + }) }) diff --git a/test/e2e/network/kube_proxy.go b/test/e2e/network/kube_proxy.go index 2f268fac144..a8253d2567e 100644 --- a/test/e2e/network/kube_proxy.go +++ b/test/e2e/network/kube_proxy.go @@ -17,7 +17,6 @@ limitations under the License. package network import ( - "context" "fmt" "math" "net" @@ -37,7 +36,6 @@ import ( netutils "k8s.io/utils/net" "github.com/onsi/ginkgo" - "github.com/onsi/gomega" ) var kubeProxyE2eImage = imageutils.GetE2EImage(imageutils.Agnhost) @@ -240,117 +238,4 @@ var _ = SIGDescribe("KubeProxy", func() { } }) - // Regression test for #74839, where: - // Packets considered INVALID by conntrack are now dropped. In particular, this fixes - // a problem where spurious retransmits in a long-running TCP connection to a service - // IP could result in the connection being closed with the error "Connection reset by - // peer" - ginkgo.It("should resolve connection reset issue #74839 [Slow]", func() { - serverLabel := map[string]string{ - "app": "boom-server", - } - clientLabel := map[string]string{ - "app": "client", - } - - serverPod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "boom-server", - Labels: serverLabel, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "boom-server", - Image: imageutils.GetE2EImage(imageutils.RegressionIssue74839), - Ports: []v1.ContainerPort{ - { - ContainerPort: 9000, // Default port exposed by boom-server - }, - }, - }, - }, - Affinity: &v1.Affinity{ - PodAntiAffinity: &v1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchLabels: clientLabel, - }, - TopologyKey: "kubernetes.io/hostname", - }, - }, - }, - }, - }, - } - _, err := fr.ClientSet.CoreV1().Pods(fr.Namespace.Name).Create(context.TODO(), serverPod, metav1.CreateOptions{}) - framework.ExpectNoError(err) - - err = e2epod.WaitForPodsRunningReady(fr.ClientSet, fr.Namespace.Name, 1, 0, framework.PodReadyBeforeTimeout, map[string]string{}) - framework.ExpectNoError(err) - - ginkgo.By("Server pod created") - - svc := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "boom-server", - }, - Spec: v1.ServiceSpec{ - Selector: serverLabel, - Ports: []v1.ServicePort{ - { - Protocol: v1.ProtocolTCP, - Port: 9000, - }, - }, - }, - } - _, err = fr.ClientSet.CoreV1().Services(fr.Namespace.Name).Create(context.TODO(), svc, metav1.CreateOptions{}) - framework.ExpectNoError(err) - - ginkgo.By("Server service created") - - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "startup-script", - Labels: clientLabel, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "startup-script", - Image: imageutils.GetE2EImage(imageutils.BusyBox), - Command: []string{ - "sh", "-c", "while true; do sleep 2; nc boom-server 9000& done", - }, - }, - }, - Affinity: &v1.Affinity{ - PodAntiAffinity: &v1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchLabels: serverLabel, - }, - TopologyKey: "kubernetes.io/hostname", - }, - }, - }, - }, - RestartPolicy: v1.RestartPolicyNever, - }, - } - _, err = fr.ClientSet.CoreV1().Pods(fr.Namespace.Name).Create(context.TODO(), pod, metav1.CreateOptions{}) - framework.ExpectNoError(err) - - ginkgo.By("Client pod created") - - for i := 0; i < 20; i++ { - time.Sleep(3 * time.Second) - resultPod, err := fr.ClientSet.CoreV1().Pods(fr.Namespace.Name).Get(context.TODO(), serverPod.Name, metav1.GetOptions{}) - framework.ExpectNoError(err) - gomega.Expect(resultPod.Status.ContainerStatuses[0].LastTerminationState.Terminated).Should(gomega.BeNil()) - } - }) })