From cc3cc93e6a4a4da254f3500d1f320bd1ee0ecd96 Mon Sep 17 00:00:00 2001 From: LiHui Date: Mon, 29 Aug 2022 14:15:01 +0800 Subject: [PATCH] kubectl: fix memory leaks in port forwarding client Signed-off-by: LiHui Kubernetes-commit: 1df24569a0bf62a528c49f73fdb236fd56eb05ee --- tools/portforward/portforward.go | 2 ++ tools/portforward/portforward_test.go | 11 +++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/tools/portforward/portforward.go b/tools/portforward/portforward.go index f4c1984d..6b5e3076 100644 --- a/tools/portforward/portforward.go +++ b/tools/portforward/portforward.go @@ -347,6 +347,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { } // we're not writing to this stream errorStream.Close() + defer pf.streamConn.RemoveStreams(errorStream) errorChan := make(chan error) go func() { @@ -367,6 +368,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err)) return } + defer pf.streamConn.RemoveStreams(dataStream) localError := make(chan struct{}) remoteDone := make(chan struct{}) diff --git a/tools/portforward/portforward_test.go b/tools/portforward/portforward_test.go index 04427e12..ada70339 100644 --- a/tools/portforward/portforward_test.go +++ b/tools/portforward/portforward_test.go @@ -51,6 +51,7 @@ type fakeConnection struct { closeChan chan bool dataStream *fakeStream errorStream *fakeStream + streamCount int } func newFakeConnection() *fakeConnection { @@ -64,8 +65,10 @@ func newFakeConnection() *fakeConnection { func (c *fakeConnection) CreateStream(headers http.Header) (httpstream.Stream, error) { switch headers.Get(v1.StreamType) { case v1.StreamTypeData: + c.streamCount++ return c.dataStream, nil case v1.StreamTypeError: + c.streamCount++ return c.errorStream, nil default: return nil, fmt.Errorf("fakeStream creation not supported for stream type %s", headers.Get(v1.StreamType)) @@ -84,7 +87,10 @@ func (c *fakeConnection) CloseChan() <-chan bool { return c.closeChan } -func (c *fakeConnection) RemoveStreams(_ ...httpstream.Stream) { +func (c *fakeConnection) RemoveStreams(streams ...httpstream.Stream) { + for range streams { + c.streamCount-- + } } func (c *fakeConnection) SetIdleTimeout(timeout time.Duration) { @@ -504,7 +510,7 @@ func TestHandleConnection(t *testing.T) { // Test handleConnection pf.handleConnection(localConnection, ForwardedPort{Local: 1111, Remote: 2222}) - + assert.Equal(t, 0, remoteConnection.streamCount, "stream count should be zero") assert.Equal(t, "test data from local", remoteDataReceived.String()) assert.Equal(t, "test data from remote", localConnection.receiveBuffer.String()) assert.Equal(t, "Handling connection for 1111\n", out.String()) @@ -538,6 +544,7 @@ func TestHandleConnectionSendsRemoteError(t *testing.T) { // Test handleConnection, using go-routine because it needs to be able to write to unbuffered pf.errorChan pf.handleConnection(localConnection, ForwardedPort{Local: 1111, Remote: 2222}) + assert.Equal(t, 0, remoteConnection.streamCount, "stream count should be zero") assert.Equal(t, "", remoteDataReceived.String()) assert.Equal(t, "", localConnection.receiveBuffer.String()) assert.Equal(t, "Handling connection for 1111\n", out.String())