From 2fae847120c7a9489fea1f332358180cd6f15f6b Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Tue, 24 Nov 2015 15:38:11 -0500 Subject: [PATCH] Fix TestPortForward flake TestPortForward was failing occasionally due to the way the test was written. It created a port forwarding session, then connected a client to the local port, attempted to send some data, attempted to receive some data, and then tore down the port forwarding session. Unfortunately, some times the attempt to send data from the client to the remote would be enqueued but not processed by the time the test tore down everything. As a result, the data stream could get closed before the client's data was transmitted to the server. If this happened, you'd see an error such as 'forward 2 ports with bidirectional data: server expected to receive "ghij", got "" for port 6000'. This fixes the test by serializing the data flow: the client writes to the remote, the remote waits to receive that data, the remote writes to the client, and the client waits to receive the data from the remote. This all takes place prior to the test tearing down port forwarding. --- .../portforward/portforward_test.go | 43 ++++++++----------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/pkg/client/unversioned/portforward/portforward_test.go b/pkg/client/unversioned/portforward/portforward_test.go index e82b74f6497..2d22da370c8 100644 --- a/pkg/client/unversioned/portforward/portforward_test.go +++ b/pkg/client/unversioned/portforward/portforward_test.go @@ -207,6 +207,8 @@ func TestGetListener(t *testing.T) { // kubelet.PortForwarder. type fakePortForwarder struct { lock sync.Mutex + // stores data expected from the stream per port + expected map[uint16]string // stores data received from the stream per port received map[uint16]string // data to be sent to the stream per port @@ -218,33 +220,23 @@ var _ kubelet.PortForwarder = &fakePortForwarder{} func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { defer stream.Close() - var wg sync.WaitGroup + // read from the client + received := make([]byte, len(pf.expected[port])) + n, err := stream.Read(received) + if err != nil { + return fmt.Errorf("error reading from client for port %d: %v", port, err) + } + if n != len(pf.expected[port]) { + return fmt.Errorf("unexpected length read from client for port %d: got %d, expected %d. data=%q", port, n, len(pf.expected[port]), string(received)) + } - // client -> server - wg.Add(1) - go func() { - defer wg.Done() + // store the received content + pf.lock.Lock() + pf.received[port] = string(received) + pf.lock.Unlock() - // copy from stream into a buffer - received := new(bytes.Buffer) - io.Copy(received, stream) - - // store the received content - pf.lock.Lock() - pf.received[port] = received.String() - pf.lock.Unlock() - }() - - // server -> client - wg.Add(1) - go func() { - defer wg.Done() - - // send the hardcoded data to the stream - io.Copy(stream, strings.NewReader(pf.send[port])) - }() - - wg.Wait() + // send the hardcoded data to the client + io.Copy(stream, strings.NewReader(pf.send[port])) return nil } @@ -254,6 +246,7 @@ func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port uint16 func fakePortForwardServer(t *testing.T, testName string, serverSends, expectedFromClient map[uint16]string) http.HandlerFunc { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { pf := &fakePortForwarder{ + expected: expectedFromClient, received: make(map[uint16]string), send: serverSends, }