fix: draining remote stream after port-forward connection broken

Signed-off-by: Nic <qianyong@api7.ai>
This commit is contained in:
Nic 2024-10-04 14:48:15 +08:00 committed by Maciej Szulik
parent 847be85000
commit dbe6b6657b
No known key found for this signature in database
GPG Key ID: F15E55D276FA84C4
2 changed files with 99 additions and 0 deletions

View File

@ -406,6 +406,11 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
case <-remoteDone: case <-remoteDone:
case <-localError: case <-localError:
} }
/*
reset dataStream to discard any unsent data, preventing port forwarding from being blocked.
we must reset dataStream before waiting on errorChan, otherwise, the blocking data will affect errorStream and cause <-errorChan to block indefinitely.
*/
_ = dataStream.Reset()
// always expect something on errorChan (it may be nil) // always expect something on errorChan (it may be nil)
err = <-errorChan err = <-errorChan

View File

@ -25,6 +25,7 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"net/http"
"os/exec" "os/exec"
"regexp" "regexp"
"strconv" "strconv"
@ -123,6 +124,36 @@ func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string, bi
} }
} }
func testWebServerPod() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Labels: map[string]string{"name": podName},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "testwebserver",
Image: imageutils.GetE2EImage(imageutils.Agnhost),
Args: []string{"test-webserver"},
Ports: []v1.ContainerPort{{ContainerPort: int32(80)}},
ReadinessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
HTTPGet: &v1.HTTPGetAction{
Path: "/",
Port: intstr.FromInt32(int32(80)),
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 3,
FailureThreshold: 10,
},
},
},
},
}
}
// WaitForTerminatedContainer waits till a given container be terminated for a given pod. // WaitForTerminatedContainer waits till a given container be terminated for a given pod.
func WaitForTerminatedContainer(ctx context.Context, f *framework.Framework, pod *v1.Pod, containerName string) error { func WaitForTerminatedContainer(ctx context.Context, f *framework.Framework, pod *v1.Pod, containerName string) error {
return e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "container terminated", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) { return e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "container terminated", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) {
@ -493,6 +524,69 @@ var _ = SIGDescribe("Kubectl Port forwarding", func() {
doTestOverWebSockets(ctx, "localhost", f) doTestOverWebSockets(ctx, "localhost", f)
}) })
}) })
ginkgo.Describe("Shutdown client connection while the remote stream is writing data to the port-forward connection", func() {
ginkgo.It("port-forward should keep working after detect broken connection", func(ctx context.Context) {
ginkgo.By("Creating the target pod")
pod := testWebServerPod()
if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
framework.Failf("Couldn't create pod: %v", err)
}
if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil {
framework.Failf("Pod did not start running: %v", err)
}
ginkgo.By("Running 'kubectl port-forward'")
cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
defer cmd.Stop()
ginkgo.By("Send a http request to verify port-forward working")
client := http.Client{
Timeout: 5 * time.Second,
}
resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port))
if err != nil {
framework.Failf("Couldn't get http response from port-forward: %v", err)
}
if resp.StatusCode != http.StatusOK {
framework.Failf("Expected status code %d, got %d", http.StatusOK, resp.StatusCode)
}
ginkgo.By("Dialing the local port")
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port))
if err != nil {
framework.Failf("Couldn't connect to port %d: %v", cmd.port, err)
}
// use raw tcp connection to emulate client close connection without reading response
ginkgo.By("Request agohost binary file (40MB+)")
requestLines := []string{"GET /agnhost HTTP/1.1", "Host: localhost", ""}
for _, line := range requestLines {
if _, err := conn.Write(append([]byte(line), []byte("\r\n")...)); err != nil {
framework.Failf("Couldn't write http request to local connection: %v", err)
}
}
ginkgo.By("Read only one byte from the connection")
if _, err := conn.Read(make([]byte, 1)); err != nil {
framework.Logf("Couldn't reading from the local connection: %v", err)
}
ginkgo.By("Close client connection without reading remain data")
if err := conn.Close(); err != nil {
framework.Failf("Couldn't close local connection: %v", err)
}
ginkgo.By("Send another http request through port-forward again")
resp, err = client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port))
if err != nil {
framework.Failf("Couldn't get http response from port-forward: %v", err)
}
if resp.StatusCode != http.StatusOK {
framework.Failf("Expected status code %d, got %d", http.StatusOK, resp.StatusCode)
}
})
})
}) })
func wsRead(conn *websocket.Conn) (byte, []byte, error) { func wsRead(conn *websocket.Conn) (byte, []byte, error) {