From 0b1617ccefbc6ea61c0e7c2b0b4052703f11c51c Mon Sep 17 00:00:00 2001 From: Maciej Szulik Date: Wed, 16 Oct 2024 14:02:01 +0200 Subject: [PATCH] Clean error handling in port-forward This commit introduces: 1. Cleanups in port-forwarding error handling code, which ensures that we only compare lowercased text always. 2. E2E verifying that when a pod is removed a port-forward is stopped. Signed-off-by: Maciej Szulik --- .../tools/portforward/portforward.go | 22 ++- test/e2e/kubectl/portforward.go | 164 ++++++++++++++---- 2 files changed, 146 insertions(+), 40 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/portforward/portforward.go b/staging/src/k8s.io/client-go/tools/portforward/portforward.go index ec26efdd282..126c14e8fa0 100644 --- a/staging/src/k8s.io/client-go/tools/portforward/portforward.go +++ b/staging/src/k8s.io/client-go/tools/portforward/portforward.go @@ -37,7 +37,13 @@ import ( // TODO move to API machinery and re-unify with kubelet/server/portfoward const PortForwardProtocolV1Name = "portforward.k8s.io" -var ErrLostConnectionToPod = errors.New("lost connection to pod") +var ( + // error returned whenever we lost connection to a pod + ErrLostConnectionToPod = errors.New("lost connection to pod") + + // set of error we're expecting during port-forwarding + networkClosedError = "use of closed network connection" +) // PortForwarder knows how to listen for local connections and forward them to // a remote pod via an upgraded HTTP request. @@ -312,7 +318,7 @@ func (pf *PortForwarder) waitForConnection(listener net.Listener, port Forwarded conn, err := listener.Accept() if err != nil { // TODO consider using something like https://github.com/hydrogen18/stoppableListener? - if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") { + if !strings.Contains(strings.ToLower(err.Error()), networkClosedError) { runtime.HandleError(fmt.Errorf("error accepting connection on port %d: %v", port.Local, err)) } return @@ -381,7 +387,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { go func() { // Copy from the remote side to the local port. - if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { + if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) { runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err)) } @@ -394,7 +400,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { defer dataStream.Close() // Copy from the local port to the remote side. - if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { + if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) { runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err)) // break out of the select below without waiting for the other copy to finish close(localError) @@ -406,10 +412,10 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { case <-remoteDone: case <-localError: } - /* - reset dataStream to discard any unsent data, preventing port forwarding from being blocked. - we must reset dataStream before waiting on errorChan, otherwise, the blocking data will affect errorStream and cause <-errorChan to block indefinitely. - */ + + // reset dataStream to discard any unsent data, preventing port forwarding from being blocked. + // we must reset dataStream before waiting on errorChan, otherwise, + // the blocking data will affect errorStream and cause <-errorChan to block indefinitely. _ = dataStream.Reset() // always expect something on errorChan (it may be nil) diff --git a/test/e2e/kubectl/portforward.go b/test/e2e/kubectl/portforward.go index fb1699788ec..a8eac7c538a 100644 --- a/test/e2e/kubectl/portforward.go +++ b/test/e2e/kubectl/portforward.go @@ -36,6 +36,9 @@ import ( "golang.org/x/net/websocket" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl" @@ -124,6 +127,41 @@ func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string, bi } } +func pfNeverReadRequestBodyPod() *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "issue-74551", + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Name: "server", + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Args: []string{ + "netexec", + "--http-port=80", + }, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.IntOrString{ + IntVal: int32(80), + }, + Scheme: v1.URISchemeHTTP, + }, + }, + InitialDelaySeconds: 5, + TimeoutSeconds: 60, + PeriodSeconds: 1, + }, + }, + }, + }, + } +} + func testWebServerPod() *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -525,16 +563,70 @@ var _ = SIGDescribe("Kubectl Port forwarding", func() { }) }) + ginkgo.Describe("with a pod being removed", func() { + ginkgo.It("should stop port-forwarding", func(ctx context.Context) { + ginkgo.By("Creating the target pod") + pod := pfNeverReadRequestBodyPod() + _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}) + framework.ExpectNoError(err, "couldn't create pod") + + err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout) + framework.ExpectNoError(err, "pod did not start running") + + ginkgo.By("Running 'kubectl port-forward'") + cmd := runPortForward(f.Namespace.Name, pod.Name, 80) + defer cmd.Stop() + + ginkgo.By("Running port-forward client") + reqChan := make(chan bool) + errorChan := make(chan error) + go func() { + defer ginkgo.GinkgoRecover() + + // try to mock a big request, which should take some time + for sentBodySize := 0; sentBodySize < 1024*1024*1024; { + size := rand.Intn(4 * 1024 * 1024) + url := fmt.Sprintf("http://localhost:%d/header", cmd.port) + _, err := post(url, strings.NewReader(strings.Repeat("x", size)), nil) + if err != nil { + errorChan <- err + } + ginkgo.By(fmt.Sprintf("Sent %d chunk of data", sentBodySize)) + if sentBodySize == 0 { + close(reqChan) + } + sentBodySize += size + } + }() + + ginkgo.By("Remove the forwarded pod after the first client request") + <-reqChan + e2epod.DeletePodOrFail(ctx, f.ClientSet, f.Namespace.Name, pod.Name) + + ginkgo.By("Wait for client being interrupted") + select { + case err = <-errorChan: + case <-time.After(e2epod.DefaultPodDeletionTimeout): + } + + ginkgo.By("Check the client error") + gomega.Expect(err).To(gomega.HaveOccurred()) + gomega.Expect(err.Error()).To(gomega.Or(gomega.ContainSubstring("connection reset by peer"), gomega.ContainSubstring("EOF"))) + + ginkgo.By("Check kubectl port-forward exit code") + gomega.Expect(cmd.cmd.ProcessState.ExitCode()).To(gomega.BeNumerically("<", 0), "kubectl port-forward should finish with non-zero exit code") + }) + }) + ginkgo.Describe("Shutdown client connection while the remote stream is writing data to the port-forward connection", func() { ginkgo.It("port-forward should keep working after detect broken connection", func(ctx context.Context) { ginkgo.By("Creating the target pod") pod := testWebServerPod() - if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil { - framework.Failf("Couldn't create pod: %v", err) - } - if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil { - framework.Failf("Pod did not start running: %v", err) - } + _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}) + framework.ExpectNoError(err, "couldn't create pod") + + err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout) + framework.ExpectNoError(err, "pod did not start running") ginkgo.By("Running 'kubectl port-forward'") cmd := runPortForward(f.Namespace.Name, pod.Name, 80) @@ -542,49 +634,36 @@ var _ = SIGDescribe("Kubectl Port forwarding", func() { ginkgo.By("Send a http request to verify port-forward working") client := http.Client{ - Timeout: 5 * time.Second, + Timeout: 10 * time.Second, } resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port)) - if err != nil { - framework.Failf("Couldn't get http response from port-forward: %v", err) - } - if resp.StatusCode != http.StatusOK { - framework.Failf("Expected status code %d, got %d", http.StatusOK, resp.StatusCode) - } + framework.ExpectNoError(err, "couldn't get http response from port-forward") + gomega.Expect(resp.StatusCode).To(gomega.Equal(http.StatusOK), "unexpected status code") ginkgo.By("Dialing the local port") conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port)) - if err != nil { - framework.Failf("Couldn't connect to port %d: %v", cmd.port, err) - } + framework.ExpectNoError(err, "couldn't connect to port %d", cmd.port) // use raw tcp connection to emulate client close connection without reading response - ginkgo.By("Request agohost binary file (40MB+)") + ginkgo.By("Request agnhost binary file (40MB+)") requestLines := []string{"GET /agnhost HTTP/1.1", "Host: localhost", ""} for _, line := range requestLines { - if _, err := conn.Write(append([]byte(line), []byte("\r\n")...)); err != nil { - framework.Failf("Couldn't write http request to local connection: %v", err) - } + _, err := conn.Write(append([]byte(line), []byte("\r\n")...)) + framework.ExpectNoError(err, "couldn't write http request to local connection") } ginkgo.By("Read only one byte from the connection") - if _, err := conn.Read(make([]byte, 1)); err != nil { - framework.Logf("Couldn't reading from the local connection: %v", err) - } + _, err = conn.Read(make([]byte, 1)) + framework.ExpectNoError(err, "couldn't read from the local connection") ginkgo.By("Close client connection without reading remain data") - if err := conn.Close(); err != nil { - framework.Failf("Couldn't close local connection: %v", err) - } + err = conn.Close() + framework.ExpectNoError(err, "couldn't close local connection") ginkgo.By("Send another http request through port-forward again") resp, err = client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port)) - if err != nil { - framework.Failf("Couldn't get http response from port-forward: %v", err) - } - if resp.StatusCode != http.StatusOK { - framework.Failf("Expected status code %d, got %d", http.StatusOK, resp.StatusCode) - } + framework.ExpectNoError(err, "couldn't get http response from port-forward") + gomega.Expect(resp.StatusCode).To(gomega.Equal(http.StatusOK), "unexpected status code") }) }) }) @@ -615,3 +694,24 @@ func wsWrite(conn *websocket.Conn, channel byte, data []byte) error { err := websocket.Message.Send(conn, frame) return err } + +func post(url string, reader io.Reader, transport *http.Transport) (string, error) { + if transport == nil { + transport = utilnet.SetTransportDefaults(&http.Transport{}) + } + client := &http.Client{Transport: transport} + req, err := http.NewRequest(http.MethodPost, url, reader) + if err != nil { + return "", err + } + resp, err := client.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() //nolint: errcheck + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + return string(body), nil +}