mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Clean error handling in port-forward
This commit introduces: 1. Cleanups in port-forwarding error handling code, which ensures that we only compare lowercased text always. 2. E2E verifying that when a pod is removed a port-forward is stopped. Signed-off-by: Maciej Szulik <soltysh@gmail.com>
This commit is contained in:
parent
dbe6b6657b
commit
0b1617ccef
@ -37,7 +37,13 @@ import (
|
|||||||
// TODO move to API machinery and re-unify with kubelet/server/portfoward
|
// TODO move to API machinery and re-unify with kubelet/server/portfoward
|
||||||
const PortForwardProtocolV1Name = "portforward.k8s.io"
|
const PortForwardProtocolV1Name = "portforward.k8s.io"
|
||||||
|
|
||||||
var ErrLostConnectionToPod = errors.New("lost connection to pod")
|
var (
|
||||||
|
// error returned whenever we lost connection to a pod
|
||||||
|
ErrLostConnectionToPod = errors.New("lost connection to pod")
|
||||||
|
|
||||||
|
// set of error we're expecting during port-forwarding
|
||||||
|
networkClosedError = "use of closed network connection"
|
||||||
|
)
|
||||||
|
|
||||||
// PortForwarder knows how to listen for local connections and forward them to
|
// PortForwarder knows how to listen for local connections and forward them to
|
||||||
// a remote pod via an upgraded HTTP request.
|
// a remote pod via an upgraded HTTP request.
|
||||||
@ -312,7 +318,7 @@ func (pf *PortForwarder) waitForConnection(listener net.Listener, port Forwarded
|
|||||||
conn, err := listener.Accept()
|
conn, err := listener.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO consider using something like https://github.com/hydrogen18/stoppableListener?
|
// TODO consider using something like https://github.com/hydrogen18/stoppableListener?
|
||||||
if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") {
|
if !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
|
||||||
runtime.HandleError(fmt.Errorf("error accepting connection on port %d: %v", port.Local, err))
|
runtime.HandleError(fmt.Errorf("error accepting connection on port %d: %v", port.Local, err))
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@ -381,7 +387,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
// Copy from the remote side to the local port.
|
// Copy from the remote side to the local port.
|
||||||
if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
|
||||||
runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
|
runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -394,7 +400,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
|
|||||||
defer dataStream.Close()
|
defer dataStream.Close()
|
||||||
|
|
||||||
// Copy from the local port to the remote side.
|
// Copy from the local port to the remote side.
|
||||||
if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
|
||||||
runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
|
runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
|
||||||
// break out of the select below without waiting for the other copy to finish
|
// break out of the select below without waiting for the other copy to finish
|
||||||
close(localError)
|
close(localError)
|
||||||
@ -406,10 +412,10 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
|
|||||||
case <-remoteDone:
|
case <-remoteDone:
|
||||||
case <-localError:
|
case <-localError:
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
reset dataStream to discard any unsent data, preventing port forwarding from being blocked.
|
// reset dataStream to discard any unsent data, preventing port forwarding from being blocked.
|
||||||
we must reset dataStream before waiting on errorChan, otherwise, the blocking data will affect errorStream and cause <-errorChan to block indefinitely.
|
// we must reset dataStream before waiting on errorChan, otherwise,
|
||||||
*/
|
// the blocking data will affect errorStream and cause <-errorChan to block indefinitely.
|
||||||
_ = dataStream.Reset()
|
_ = dataStream.Reset()
|
||||||
|
|
||||||
// always expect something on errorChan (it may be nil)
|
// always expect something on errorChan (it may be nil)
|
||||||
|
@ -36,6 +36,9 @@ import (
|
|||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||||
|
"k8s.io/apimachinery/pkg/util/rand"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
|
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
|
||||||
@ -124,6 +127,41 @@ func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string, bi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func pfNeverReadRequestBodyPod() *v1.Pod {
|
||||||
|
return &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "issue-74551",
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
RestartPolicy: v1.RestartPolicyNever,
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Name: "server",
|
||||||
|
Image: imageutils.GetE2EImage(imageutils.Agnhost),
|
||||||
|
Args: []string{
|
||||||
|
"netexec",
|
||||||
|
"--http-port=80",
|
||||||
|
},
|
||||||
|
ReadinessProbe: &v1.Probe{
|
||||||
|
ProbeHandler: v1.ProbeHandler{
|
||||||
|
HTTPGet: &v1.HTTPGetAction{
|
||||||
|
Path: "/healthz",
|
||||||
|
Port: intstr.IntOrString{
|
||||||
|
IntVal: int32(80),
|
||||||
|
},
|
||||||
|
Scheme: v1.URISchemeHTTP,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
InitialDelaySeconds: 5,
|
||||||
|
TimeoutSeconds: 60,
|
||||||
|
PeriodSeconds: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func testWebServerPod() *v1.Pod {
|
func testWebServerPod() *v1.Pod {
|
||||||
return &v1.Pod{
|
return &v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
@ -525,16 +563,70 @@ var _ = SIGDescribe("Kubectl Port forwarding", func() {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
ginkgo.Describe("with a pod being removed", func() {
|
||||||
|
ginkgo.It("should stop port-forwarding", func(ctx context.Context) {
|
||||||
|
ginkgo.By("Creating the target pod")
|
||||||
|
pod := pfNeverReadRequestBodyPod()
|
||||||
|
_, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{})
|
||||||
|
framework.ExpectNoError(err, "couldn't create pod")
|
||||||
|
|
||||||
|
err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout)
|
||||||
|
framework.ExpectNoError(err, "pod did not start running")
|
||||||
|
|
||||||
|
ginkgo.By("Running 'kubectl port-forward'")
|
||||||
|
cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
|
||||||
|
defer cmd.Stop()
|
||||||
|
|
||||||
|
ginkgo.By("Running port-forward client")
|
||||||
|
reqChan := make(chan bool)
|
||||||
|
errorChan := make(chan error)
|
||||||
|
go func() {
|
||||||
|
defer ginkgo.GinkgoRecover()
|
||||||
|
|
||||||
|
// try to mock a big request, which should take some time
|
||||||
|
for sentBodySize := 0; sentBodySize < 1024*1024*1024; {
|
||||||
|
size := rand.Intn(4 * 1024 * 1024)
|
||||||
|
url := fmt.Sprintf("http://localhost:%d/header", cmd.port)
|
||||||
|
_, err := post(url, strings.NewReader(strings.Repeat("x", size)), nil)
|
||||||
|
if err != nil {
|
||||||
|
errorChan <- err
|
||||||
|
}
|
||||||
|
ginkgo.By(fmt.Sprintf("Sent %d chunk of data", sentBodySize))
|
||||||
|
if sentBodySize == 0 {
|
||||||
|
close(reqChan)
|
||||||
|
}
|
||||||
|
sentBodySize += size
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
ginkgo.By("Remove the forwarded pod after the first client request")
|
||||||
|
<-reqChan
|
||||||
|
e2epod.DeletePodOrFail(ctx, f.ClientSet, f.Namespace.Name, pod.Name)
|
||||||
|
|
||||||
|
ginkgo.By("Wait for client being interrupted")
|
||||||
|
select {
|
||||||
|
case err = <-errorChan:
|
||||||
|
case <-time.After(e2epod.DefaultPodDeletionTimeout):
|
||||||
|
}
|
||||||
|
|
||||||
|
ginkgo.By("Check the client error")
|
||||||
|
gomega.Expect(err).To(gomega.HaveOccurred())
|
||||||
|
gomega.Expect(err.Error()).To(gomega.Or(gomega.ContainSubstring("connection reset by peer"), gomega.ContainSubstring("EOF")))
|
||||||
|
|
||||||
|
ginkgo.By("Check kubectl port-forward exit code")
|
||||||
|
gomega.Expect(cmd.cmd.ProcessState.ExitCode()).To(gomega.BeNumerically("<", 0), "kubectl port-forward should finish with non-zero exit code")
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
ginkgo.Describe("Shutdown client connection while the remote stream is writing data to the port-forward connection", func() {
|
ginkgo.Describe("Shutdown client connection while the remote stream is writing data to the port-forward connection", func() {
|
||||||
ginkgo.It("port-forward should keep working after detect broken connection", func(ctx context.Context) {
|
ginkgo.It("port-forward should keep working after detect broken connection", func(ctx context.Context) {
|
||||||
ginkgo.By("Creating the target pod")
|
ginkgo.By("Creating the target pod")
|
||||||
pod := testWebServerPod()
|
pod := testWebServerPod()
|
||||||
if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
|
_, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{})
|
||||||
framework.Failf("Couldn't create pod: %v", err)
|
framework.ExpectNoError(err, "couldn't create pod")
|
||||||
}
|
|
||||||
if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil {
|
err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout)
|
||||||
framework.Failf("Pod did not start running: %v", err)
|
framework.ExpectNoError(err, "pod did not start running")
|
||||||
}
|
|
||||||
|
|
||||||
ginkgo.By("Running 'kubectl port-forward'")
|
ginkgo.By("Running 'kubectl port-forward'")
|
||||||
cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
|
cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
|
||||||
@ -542,49 +634,36 @@ var _ = SIGDescribe("Kubectl Port forwarding", func() {
|
|||||||
|
|
||||||
ginkgo.By("Send a http request to verify port-forward working")
|
ginkgo.By("Send a http request to verify port-forward working")
|
||||||
client := http.Client{
|
client := http.Client{
|
||||||
Timeout: 5 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port))
|
resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port))
|
||||||
if err != nil {
|
framework.ExpectNoError(err, "couldn't get http response from port-forward")
|
||||||
framework.Failf("Couldn't get http response from port-forward: %v", err)
|
gomega.Expect(resp.StatusCode).To(gomega.Equal(http.StatusOK), "unexpected status code")
|
||||||
}
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
framework.Failf("Expected status code %d, got %d", http.StatusOK, resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
ginkgo.By("Dialing the local port")
|
ginkgo.By("Dialing the local port")
|
||||||
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port))
|
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port))
|
||||||
if err != nil {
|
framework.ExpectNoError(err, "couldn't connect to port %d", cmd.port)
|
||||||
framework.Failf("Couldn't connect to port %d: %v", cmd.port, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// use raw tcp connection to emulate client close connection without reading response
|
// use raw tcp connection to emulate client close connection without reading response
|
||||||
ginkgo.By("Request agohost binary file (40MB+)")
|
ginkgo.By("Request agnhost binary file (40MB+)")
|
||||||
requestLines := []string{"GET /agnhost HTTP/1.1", "Host: localhost", ""}
|
requestLines := []string{"GET /agnhost HTTP/1.1", "Host: localhost", ""}
|
||||||
for _, line := range requestLines {
|
for _, line := range requestLines {
|
||||||
if _, err := conn.Write(append([]byte(line), []byte("\r\n")...)); err != nil {
|
_, err := conn.Write(append([]byte(line), []byte("\r\n")...))
|
||||||
framework.Failf("Couldn't write http request to local connection: %v", err)
|
framework.ExpectNoError(err, "couldn't write http request to local connection")
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ginkgo.By("Read only one byte from the connection")
|
ginkgo.By("Read only one byte from the connection")
|
||||||
if _, err := conn.Read(make([]byte, 1)); err != nil {
|
_, err = conn.Read(make([]byte, 1))
|
||||||
framework.Logf("Couldn't reading from the local connection: %v", err)
|
framework.ExpectNoError(err, "couldn't read from the local connection")
|
||||||
}
|
|
||||||
|
|
||||||
ginkgo.By("Close client connection without reading remain data")
|
ginkgo.By("Close client connection without reading remain data")
|
||||||
if err := conn.Close(); err != nil {
|
err = conn.Close()
|
||||||
framework.Failf("Couldn't close local connection: %v", err)
|
framework.ExpectNoError(err, "couldn't close local connection")
|
||||||
}
|
|
||||||
|
|
||||||
ginkgo.By("Send another http request through port-forward again")
|
ginkgo.By("Send another http request through port-forward again")
|
||||||
resp, err = client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port))
|
resp, err = client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port))
|
||||||
if err != nil {
|
framework.ExpectNoError(err, "couldn't get http response from port-forward")
|
||||||
framework.Failf("Couldn't get http response from port-forward: %v", err)
|
gomega.Expect(resp.StatusCode).To(gomega.Equal(http.StatusOK), "unexpected status code")
|
||||||
}
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
framework.Failf("Expected status code %d, got %d", http.StatusOK, resp.StatusCode)
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -615,3 +694,24 @@ func wsWrite(conn *websocket.Conn, channel byte, data []byte) error {
|
|||||||
err := websocket.Message.Send(conn, frame)
|
err := websocket.Message.Send(conn, frame)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func post(url string, reader io.Reader, transport *http.Transport) (string, error) {
|
||||||
|
if transport == nil {
|
||||||
|
transport = utilnet.SetTransportDefaults(&http.Transport{})
|
||||||
|
}
|
||||||
|
client := &http.Client{Transport: transport}
|
||||||
|
req, err := http.NewRequest(http.MethodPost, url, reader)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close() //nolint: errcheck
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return string(body), nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user