Merge pull request #128681 from soltysh/client-go_port_forward_reset

Client go port forward reset, error handling and tests

Kubernetes-commit: 210deea063a5a778e8c3a8e32b8bc4c808b87835
This commit is contained in:
Kubernetes Publisher 2024-11-07 23:33:03 +00:00
commit fe3db7fea6

View File

@ -37,7 +37,13 @@ import (
// TODO move to API machinery and re-unify with kubelet/server/portfoward // TODO move to API machinery and re-unify with kubelet/server/portfoward
const PortForwardProtocolV1Name = "portforward.k8s.io" const PortForwardProtocolV1Name = "portforward.k8s.io"
var ErrLostConnectionToPod = errors.New("lost connection to pod") var (
// error returned whenever we lost connection to a pod
ErrLostConnectionToPod = errors.New("lost connection to pod")
// set of error we're expecting during port-forwarding
networkClosedError = "use of closed network connection"
)
// PortForwarder knows how to listen for local connections and forward them to // PortForwarder knows how to listen for local connections and forward them to
// a remote pod via an upgraded HTTP request. // a remote pod via an upgraded HTTP request.
@ -312,7 +318,7 @@ func (pf *PortForwarder) waitForConnection(listener net.Listener, port Forwarded
conn, err := listener.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
// TODO consider using something like https://github.com/hydrogen18/stoppableListener? // TODO consider using something like https://github.com/hydrogen18/stoppableListener?
if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") { if !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
runtime.HandleError(fmt.Errorf("error accepting connection on port %d: %v", port.Local, err)) runtime.HandleError(fmt.Errorf("error accepting connection on port %d: %v", port.Local, err))
} }
return return
@ -381,7 +387,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
go func() { go func() {
// Copy from the remote side to the local port. // Copy from the remote side to the local port.
if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err)) runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
} }
@ -394,7 +400,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
defer dataStream.Close() defer dataStream.Close()
// Copy from the local port to the remote side. // Copy from the local port to the remote side.
if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err)) runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
// break out of the select below without waiting for the other copy to finish // break out of the select below without waiting for the other copy to finish
close(localError) close(localError)
@ -407,6 +413,11 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
case <-localError: case <-localError:
} }
// reset dataStream to discard any unsent data, preventing port forwarding from being blocked.
// we must reset dataStream before waiting on errorChan, otherwise,
// the blocking data will affect errorStream and cause <-errorChan to block indefinitely.
_ = dataStream.Reset()
// always expect something on errorChan (it may be nil) // always expect something on errorChan (it may be nil)
err = <-errorChan err = <-errorChan
if err != nil { if err != nil {