Merge pull request #114460 from brianpursley/pf-exit

portforward: return error on lost connection to pod

Kubernetes-commit: 61cdb86814489624dd5a169526797e186a118e66
This commit is contained in:
Kubernetes Publisher 2023-01-05 11:07:58 -08:00
commit bd7ed9e647
2 changed files with 64 additions and 1 deletions

View File

@ -37,6 +37,8 @@ import (
// TODO move to API machinery and re-unify with kubelet/server/portfoward // TODO move to API machinery and re-unify with kubelet/server/portfoward
const PortForwardProtocolV1Name = "portforward.k8s.io" const PortForwardProtocolV1Name = "portforward.k8s.io"
var ErrLostConnectionToPod = errors.New("lost connection to pod")
// PortForwarder knows how to listen for local connections and forward them to // PortForwarder knows how to listen for local connections and forward them to
// a remote pod via an upgraded HTTP request. // a remote pod via an upgraded HTTP request.
type PortForwarder struct { type PortForwarder struct {
@ -230,7 +232,7 @@ func (pf *PortForwarder) forward() error {
select { select {
case <-pf.stopChan: case <-pf.stopChan:
case <-pf.streamConn.CloseChan(): case <-pf.streamConn.CloseChan():
runtime.HandleError(errors.New("lost connection to pod")) return ErrLostConnectionToPod
} }
return nil return nil

View File

@ -567,3 +567,64 @@ func TestWaitForConnectionExitsOnStreamConnClosed(t *testing.T) {
port := ForwardedPort{} port := ForwardedPort{}
pf.waitForConnection(&listener, port) pf.waitForConnection(&listener, port)
} }
func TestForwardPortsReturnsErrorWhenConnectionIsLost(t *testing.T) {
dialer := &fakeDialer{
conn: newFakeConnection(),
}
stopChan := make(chan struct{})
readyChan := make(chan struct{})
errChan := make(chan error)
pf, err := New(dialer, []string{":5000"}, stopChan, readyChan, os.Stdout, os.Stderr)
if err != nil {
t.Fatalf("failed to create new PortForwarder: %s", err)
}
go func() {
errChan <- pf.ForwardPorts()
}()
<-pf.Ready
// Simulate lost pod connection by closing streamConn, which should result in pf.ForwardPorts() returning an error.
pf.streamConn.Close()
err = <-errChan
if err == nil {
t.Fatalf("unexpected non-error from pf.ForwardPorts()")
} else if err != ErrLostConnectionToPod {
t.Fatalf("unexpected error from pf.ForwardPorts(): %s", err)
}
}
func TestForwardPortsReturnsNilWhenStopChanIsClosed(t *testing.T) {
dialer := &fakeDialer{
conn: newFakeConnection(),
}
stopChan := make(chan struct{})
readyChan := make(chan struct{})
errChan := make(chan error)
pf, err := New(dialer, []string{":5000"}, stopChan, readyChan, os.Stdout, os.Stderr)
if err != nil {
t.Fatalf("failed to create new PortForwarder: %s", err)
}
go func() {
errChan <- pf.ForwardPorts()
}()
<-pf.Ready
// Closing (or sending to) stopChan indicates a stop request by the caller, which should result in pf.ForwardPorts()
// returning nil.
close(stopChan)
err = <-errChan
if err != nil {
t.Fatalf("unexpected error from pf.ForwardPorts(): %s", err)
}
}