Fix TestPortForward flake

TestPortForward was failing occasionally due to the way the test was
written. It created a port forwarding session, then connected a client
to the local port, attempted to send some data, attempted to receive
some data, and then tore down the port forwarding session.
Unfortunately, some times the attempt to send data from the client to
the remote would be enqueued but not processed by the time the test tore
down everything. As a result, the data stream could get closed before
the client's data was transmitted to the server. If this happened, you'd
see an error such as 'forward 2 ports with bidirectional data: server
expected to receive "ghij", got "" for port 6000'.

This fixes the test by serializing the data flow: the client writes to
the remote, the remote waits to receive that data, the remote writes to
the client, and the client waits to receive the data from the remote.
This all takes place prior to the test tearing down port forwarding.
This commit is contained in:
Andy Goldstein 2015-11-24 15:38:11 -05:00
parent 2c58defc6d
commit 2fae847120

View File

@ -207,6 +207,8 @@ func TestGetListener(t *testing.T) {
// kubelet.PortForwarder.
type fakePortForwarder struct {
lock sync.Mutex
// stores data expected from the stream per port
expected map[uint16]string
// stores data received from the stream per port
received map[uint16]string
// data to be sent to the stream per port
@ -218,33 +220,23 @@ var _ kubelet.PortForwarder = &fakePortForwarder{}
func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
defer stream.Close()
var wg sync.WaitGroup
// client -> server
wg.Add(1)
go func() {
defer wg.Done()
// copy from stream into a buffer
received := new(bytes.Buffer)
io.Copy(received, stream)
// read from the client
received := make([]byte, len(pf.expected[port]))
n, err := stream.Read(received)
if err != nil {
return fmt.Errorf("error reading from client for port %d: %v", port, err)
}
if n != len(pf.expected[port]) {
return fmt.Errorf("unexpected length read from client for port %d: got %d, expected %d. data=%q", port, n, len(pf.expected[port]), string(received))
}
// store the received content
pf.lock.Lock()
pf.received[port] = received.String()
pf.received[port] = string(received)
pf.lock.Unlock()
}()
// server -> client
wg.Add(1)
go func() {
defer wg.Done()
// send the hardcoded data to the stream
// send the hardcoded data to the client
io.Copy(stream, strings.NewReader(pf.send[port]))
}()
wg.Wait()
return nil
}
@ -254,6 +246,7 @@ func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port uint16
func fakePortForwardServer(t *testing.T, testName string, serverSends, expectedFromClient map[uint16]string) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
pf := &fakePortForwarder{
expected: expectedFromClient,
received: make(map[uint16]string),
send: serverSends,
}