Merge pull request #128681 from soltysh/client-go_port_forward_reset

Client go port forward reset, error handling and tests
This commit is contained in:
Kubernetes Prow Robot
2024-11-07 23:33:03 +00:00
committed by GitHub
2 changed files with 209 additions and 4 deletions

View File

@@ -37,7 +37,13 @@ import (
// TODO move to API machinery and re-unify with kubelet/server/portfoward // TODO move to API machinery and re-unify with kubelet/server/portfoward
const PortForwardProtocolV1Name = "portforward.k8s.io" const PortForwardProtocolV1Name = "portforward.k8s.io"
var ErrLostConnectionToPod = errors.New("lost connection to pod") var (
// error returned whenever we lost connection to a pod
ErrLostConnectionToPod = errors.New("lost connection to pod")
// set of error we're expecting during port-forwarding
networkClosedError = "use of closed network connection"
)
// PortForwarder knows how to listen for local connections and forward them to // PortForwarder knows how to listen for local connections and forward them to
// a remote pod via an upgraded HTTP request. // a remote pod via an upgraded HTTP request.
@@ -312,7 +318,7 @@ func (pf *PortForwarder) waitForConnection(listener net.Listener, port Forwarded
conn, err := listener.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
// TODO consider using something like https://github.com/hydrogen18/stoppableListener? // TODO consider using something like https://github.com/hydrogen18/stoppableListener?
if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") { if !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
runtime.HandleError(fmt.Errorf("error accepting connection on port %d: %v", port.Local, err)) runtime.HandleError(fmt.Errorf("error accepting connection on port %d: %v", port.Local, err))
} }
return return
@@ -381,7 +387,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
go func() { go func() {
// Copy from the remote side to the local port. // Copy from the remote side to the local port.
if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err)) runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
} }
@@ -394,7 +400,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
defer dataStream.Close() defer dataStream.Close()
// Copy from the local port to the remote side. // Copy from the local port to the remote side.
if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err)) runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
// break out of the select below without waiting for the other copy to finish // break out of the select below without waiting for the other copy to finish
close(localError) close(localError)
@@ -407,6 +413,11 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
case <-localError: case <-localError:
} }
// reset dataStream to discard any unsent data, preventing port forwarding from being blocked.
// we must reset dataStream before waiting on errorChan, otherwise,
// the blocking data will affect errorStream and cause <-errorChan to block indefinitely.
_ = dataStream.Reset()
// always expect something on errorChan (it may be nil) // always expect something on errorChan (it may be nil)
err = <-errorChan err = <-errorChan
if err != nil { if err != nil {

View File

@@ -25,6 +25,7 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"net/http"
"os/exec" "os/exec"
"regexp" "regexp"
"strconv" "strconv"
@@ -35,6 +36,9 @@ import (
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl" e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
@@ -123,6 +127,71 @@ func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string, bi
} }
} }
func pfNeverReadRequestBodyPod() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "issue-74551",
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Name: "server",
Image: imageutils.GetE2EImage(imageutils.Agnhost),
Args: []string{
"netexec",
"--http-port=80",
},
ReadinessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
HTTPGet: &v1.HTTPGetAction{
Path: "/healthz",
Port: intstr.IntOrString{
IntVal: int32(80),
},
Scheme: v1.URISchemeHTTP,
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 60,
PeriodSeconds: 1,
},
},
},
},
}
}
func testWebServerPod() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Labels: map[string]string{"name": podName},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "testwebserver",
Image: imageutils.GetE2EImage(imageutils.Agnhost),
Args: []string{"test-webserver"},
Ports: []v1.ContainerPort{{ContainerPort: int32(80)}},
ReadinessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
HTTPGet: &v1.HTTPGetAction{
Path: "/",
Port: intstr.FromInt32(int32(80)),
},
},
InitialDelaySeconds: 5,
TimeoutSeconds: 3,
FailureThreshold: 10,
},
},
},
},
}
}
// WaitForTerminatedContainer waits till a given container be terminated for a given pod. // WaitForTerminatedContainer waits till a given container be terminated for a given pod.
func WaitForTerminatedContainer(ctx context.Context, f *framework.Framework, pod *v1.Pod, containerName string) error { func WaitForTerminatedContainer(ctx context.Context, f *framework.Framework, pod *v1.Pod, containerName string) error {
return e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "container terminated", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) { return e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "container terminated", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) {
@@ -493,6 +562,110 @@ var _ = SIGDescribe("Kubectl Port forwarding", func() {
doTestOverWebSockets(ctx, "localhost", f) doTestOverWebSockets(ctx, "localhost", f)
}) })
}) })
ginkgo.Describe("with a pod being removed", func() {
ginkgo.It("should stop port-forwarding", func(ctx context.Context) {
ginkgo.By("Creating the target pod")
pod := pfNeverReadRequestBodyPod()
_, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{})
framework.ExpectNoError(err, "couldn't create pod")
err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout)
framework.ExpectNoError(err, "pod did not start running")
ginkgo.By("Running 'kubectl port-forward'")
cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
defer cmd.Stop()
ginkgo.By("Running port-forward client")
reqChan := make(chan bool)
errorChan := make(chan error)
go func() {
defer ginkgo.GinkgoRecover()
// try to mock a big request, which should take some time
for sentBodySize := 0; sentBodySize < 1024*1024*1024; {
size := rand.Intn(4 * 1024 * 1024)
url := fmt.Sprintf("http://localhost:%d/header", cmd.port)
_, err := post(url, strings.NewReader(strings.Repeat("x", size)), nil)
if err != nil {
errorChan <- err
}
ginkgo.By(fmt.Sprintf("Sent %d chunk of data", sentBodySize))
if sentBodySize == 0 {
close(reqChan)
}
sentBodySize += size
}
}()
ginkgo.By("Remove the forwarded pod after the first client request")
<-reqChan
e2epod.DeletePodOrFail(ctx, f.ClientSet, f.Namespace.Name, pod.Name)
ginkgo.By("Wait for client being interrupted")
select {
case err = <-errorChan:
case <-time.After(e2epod.DefaultPodDeletionTimeout):
}
ginkgo.By("Check the client error")
gomega.Expect(err).To(gomega.HaveOccurred())
gomega.Expect(err.Error()).To(gomega.Or(gomega.ContainSubstring("connection reset by peer"), gomega.ContainSubstring("EOF")))
ginkgo.By("Check kubectl port-forward exit code")
gomega.Expect(cmd.cmd.ProcessState.ExitCode()).To(gomega.BeNumerically("<", 0), "kubectl port-forward should finish with non-zero exit code")
})
})
ginkgo.Describe("Shutdown client connection while the remote stream is writing data to the port-forward connection", func() {
ginkgo.It("port-forward should keep working after detect broken connection", func(ctx context.Context) {
ginkgo.By("Creating the target pod")
pod := testWebServerPod()
_, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{})
framework.ExpectNoError(err, "couldn't create pod")
err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout)
framework.ExpectNoError(err, "pod did not start running")
ginkgo.By("Running 'kubectl port-forward'")
cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
defer cmd.Stop()
ginkgo.By("Send a http request to verify port-forward working")
client := http.Client{
Timeout: 10 * time.Second,
}
resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port))
framework.ExpectNoError(err, "couldn't get http response from port-forward")
gomega.Expect(resp.StatusCode).To(gomega.Equal(http.StatusOK), "unexpected status code")
ginkgo.By("Dialing the local port")
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port))
framework.ExpectNoError(err, "couldn't connect to port %d", cmd.port)
// use raw tcp connection to emulate client close connection without reading response
ginkgo.By("Request agnhost binary file (40MB+)")
requestLines := []string{"GET /agnhost HTTP/1.1", "Host: localhost", ""}
for _, line := range requestLines {
_, err := conn.Write(append([]byte(line), []byte("\r\n")...))
framework.ExpectNoError(err, "couldn't write http request to local connection")
}
ginkgo.By("Read only one byte from the connection")
_, err = conn.Read(make([]byte, 1))
framework.ExpectNoError(err, "couldn't read from the local connection")
ginkgo.By("Close client connection without reading remain data")
err = conn.Close()
framework.ExpectNoError(err, "couldn't close local connection")
ginkgo.By("Send another http request through port-forward again")
resp, err = client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port))
framework.ExpectNoError(err, "couldn't get http response from port-forward")
gomega.Expect(resp.StatusCode).To(gomega.Equal(http.StatusOK), "unexpected status code")
})
})
}) })
func wsRead(conn *websocket.Conn) (byte, []byte, error) { func wsRead(conn *websocket.Conn) (byte, []byte, error) {
@@ -521,3 +694,24 @@ func wsWrite(conn *websocket.Conn, channel byte, data []byte) error {
err := websocket.Message.Send(conn, frame) err := websocket.Message.Send(conn, frame)
return err return err
} }
func post(url string, reader io.Reader, transport *http.Transport) (string, error) {
if transport == nil {
transport = utilnet.SetTransportDefaults(&http.Transport{})
}
client := &http.Client{Transport: transport}
req, err := http.NewRequest(http.MethodPost, url, reader)
if err != nil {
return "", err
}
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close() //nolint: errcheck
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}