From dbe6b6657bacc846656f4009ee869ca996dde1da Mon Sep 17 00:00:00 2001 From: Nic Date: Fri, 4 Oct 2024 14:48:15 +0800 Subject: [PATCH] fix: draining remote stream after port-forward connection broken Signed-off-by: Nic --- .../tools/portforward/portforward.go | 5 + test/e2e/kubectl/portforward.go | 94 +++++++++++++++++++ 2 files changed, 99 insertions(+) diff --git a/staging/src/k8s.io/client-go/tools/portforward/portforward.go b/staging/src/k8s.io/client-go/tools/portforward/portforward.go index 83ef3e929b3..ec26efdd282 100644 --- a/staging/src/k8s.io/client-go/tools/portforward/portforward.go +++ b/staging/src/k8s.io/client-go/tools/portforward/portforward.go @@ -406,6 +406,11 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { case <-remoteDone: case <-localError: } + /* + reset dataStream to discard any unsent data, preventing port forwarding from being blocked. + we must reset dataStream before waiting on errorChan, otherwise, the blocking data will affect errorStream and cause <-errorChan to block indefinitely. + */ + _ = dataStream.Reset() // always expect something on errorChan (it may be nil) err = <-errorChan diff --git a/test/e2e/kubectl/portforward.go b/test/e2e/kubectl/portforward.go index 1271fb5d497..fb1699788ec 100644 --- a/test/e2e/kubectl/portforward.go +++ b/test/e2e/kubectl/portforward.go @@ -25,6 +25,7 @@ import ( "fmt" "io" "net" + "net/http" "os/exec" "regexp" "strconv" @@ -123,6 +124,36 @@ func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string, bi } } +func testWebServerPod() *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Labels: map[string]string{"name": podName}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "testwebserver", + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Args: []string{"test-webserver"}, + Ports: []v1.ContainerPort{{ContainerPort: int32(80)}}, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt32(int32(80)), + }, + }, + InitialDelaySeconds: 5, + TimeoutSeconds: 3, + FailureThreshold: 10, + }, + }, + }, + }, + } +} + // WaitForTerminatedContainer waits till a given container be terminated for a given pod. func WaitForTerminatedContainer(ctx context.Context, f *framework.Framework, pod *v1.Pod, containerName string) error { return e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "container terminated", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) { @@ -493,6 +524,69 @@ var _ = SIGDescribe("Kubectl Port forwarding", func() { doTestOverWebSockets(ctx, "localhost", f) }) }) + + ginkgo.Describe("Shutdown client connection while the remote stream is writing data to the port-forward connection", func() { + ginkgo.It("port-forward should keep working after detect broken connection", func(ctx context.Context) { + ginkgo.By("Creating the target pod") + pod := testWebServerPod() + if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + framework.Failf("Couldn't create pod: %v", err) + } + if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil { + framework.Failf("Pod did not start running: %v", err) + } + + ginkgo.By("Running 'kubectl port-forward'") + cmd := runPortForward(f.Namespace.Name, pod.Name, 80) + defer cmd.Stop() + + ginkgo.By("Send a http request to verify port-forward working") + client := http.Client{ + Timeout: 5 * time.Second, + } + resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port)) + if err != nil { + framework.Failf("Couldn't get http response from port-forward: %v", err) + } + if resp.StatusCode != http.StatusOK { + framework.Failf("Expected status code %d, got %d", http.StatusOK, resp.StatusCode) + } + + ginkgo.By("Dialing the local port") + conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port)) + if err != nil { + framework.Failf("Couldn't connect to port %d: %v", cmd.port, err) + } + + // use raw tcp connection to emulate client close connection without reading response + ginkgo.By("Request agohost binary file (40MB+)") + requestLines := []string{"GET /agnhost HTTP/1.1", "Host: localhost", ""} + for _, line := range requestLines { + if _, err := conn.Write(append([]byte(line), []byte("\r\n")...)); err != nil { + framework.Failf("Couldn't write http request to local connection: %v", err) + } + } + + ginkgo.By("Read only one byte from the connection") + if _, err := conn.Read(make([]byte, 1)); err != nil { + framework.Logf("Couldn't reading from the local connection: %v", err) + } + + ginkgo.By("Close client connection without reading remain data") + if err := conn.Close(); err != nil { + framework.Failf("Couldn't close local connection: %v", err) + } + + ginkgo.By("Send another http request through port-forward again") + resp, err = client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port)) + if err != nil { + framework.Failf("Couldn't get http response from port-forward: %v", err) + } + if resp.StatusCode != http.StatusOK { + framework.Failf("Expected status code %d, got %d", http.StatusOK, resp.StatusCode) + } + }) + }) }) func wsRead(conn *websocket.Conn) (byte, []byte, error) {