diff --git a/pkg/client/unversioned/remotecommand/v1.go b/pkg/client/unversioned/remotecommand/v1.go index b10e5e1f1e7..f5428e0f953 100644 --- a/pkg/client/unversioned/remotecommand/v1.go +++ b/pkg/client/unversioned/remotecommand/v1.go @@ -55,12 +55,57 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error { } } + var ( + err error + errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream + ) + + // set up all the streams first headers := http.Header{} headers.Set(api.StreamType, api.StreamTypeError) - errorStream, err := conn.CreateStream(headers) + errorStream, err = conn.CreateStream(headers) if err != nil { return err } + defer errorStream.Reset() + + // Create all the streams first, then start the copy goroutines. The server doesn't start its copy + // goroutines until it's received all of the streams. If the client creates the stdin stream and + // immediately begins copying stdin data to the server, it's possible to overwhelm and wedge the + // spdy frame handler in the server so that it is full of unprocessed frames. The frames aren't + // getting processed because the server hasn't started its copying, and it won't do that until it + // gets all the streams. By creating all the streams first, we ensure that the server is ready to + // process data before the client starts sending any. See https://issues.k8s.io/16373 for more info. + if e.stdin != nil { + headers.Set(api.StreamType, api.StreamTypeStdin) + remoteStdin, err = conn.CreateStream(headers) + if err != nil { + return err + } + defer remoteStdin.Reset() + } + + if e.stdout != nil { + headers.Set(api.StreamType, api.StreamTypeStdout) + remoteStdout, err = conn.CreateStream(headers) + if err != nil { + return err + } + defer remoteStdout.Reset() + } + + if e.stderr != nil && !e.tty { + headers.Set(api.StreamType, api.StreamTypeStderr) + remoteStderr, err = conn.CreateStream(headers) + if err != nil { + return err + } + defer remoteStderr.Reset() + } + + // now that all the streams have been created, proceed with reading & copying + + // always read from errorStream go func() { message, err := ioutil.ReadAll(errorStream) if err != nil && err != io.EOF { @@ -72,15 +117,8 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error { return } }() - defer errorStream.Reset() if e.stdin != nil { - headers.Set(api.StreamType, api.StreamTypeStdin) - remoteStdin, err := conn.CreateStream(headers) - if err != nil { - return err - } - defer remoteStdin.Reset() // TODO this goroutine will never exit cleanly (the io.Copy never unblocks) // because stdin is not closed until the process exits. If we try to call // stdin.Close(), it returns no error but doesn't unblock the copy. It will @@ -93,23 +131,11 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error { if e.stdout != nil { waitCount++ - headers.Set(api.StreamType, api.StreamTypeStdout) - remoteStdout, err := conn.CreateStream(headers) - if err != nil { - return err - } - defer remoteStdout.Reset() go cp(api.StreamTypeStdout, e.stdout, remoteStdout) } if e.stderr != nil && !e.tty { waitCount++ - headers.Set(api.StreamType, api.StreamTypeStderr) - remoteStderr, err := conn.CreateStream(headers) - if err != nil { - return err - } - defer remoteStderr.Reset() go cp(api.StreamTypeStderr, e.stderr, remoteStderr) } diff --git a/pkg/client/unversioned/remotecommand/v2.go b/pkg/client/unversioned/remotecommand/v2.go index ca10dda4956..16ae32fab6b 100644 --- a/pkg/client/unversioned/remotecommand/v2.go +++ b/pkg/client/unversioned/remotecommand/v2.go @@ -42,16 +42,52 @@ type streamProtocolV2 struct { var _ streamProtocolHandler = &streamProtocolV2{} func (e *streamProtocolV2) stream(conn httpstream.Connection) error { + var ( + err error + errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream + ) + headers := http.Header{} + // set up all the streams first // set up error stream errorChan := make(chan error) headers.Set(api.StreamType, api.StreamTypeError) - errorStream, err := conn.CreateStream(headers) + errorStream, err = conn.CreateStream(headers) if err != nil { return err } + // set up stdin stream + if e.stdin != nil { + headers.Set(api.StreamType, api.StreamTypeStdin) + remoteStdin, err = conn.CreateStream(headers) + if err != nil { + return err + } + } + + // set up stdout stream + if e.stdout != nil { + headers.Set(api.StreamType, api.StreamTypeStdout) + remoteStdout, err = conn.CreateStream(headers) + if err != nil { + return err + } + } + + // set up stderr stream + if e.stderr != nil && !e.tty { + headers.Set(api.StreamType, api.StreamTypeStderr) + remoteStderr, err = conn.CreateStream(headers) + if err != nil { + return err + } + } + + // now that all the streams have been created, proceed with reading & copying + + // always read from errorStream go func() { message, err := ioutil.ReadAll(errorStream) switch { @@ -68,14 +104,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error { var wg sync.WaitGroup var once sync.Once - // set up stdin stream if e.stdin != nil { - headers.Set(api.StreamType, api.StreamTypeStdin) - remoteStdin, err := conn.CreateStream(headers) - if err != nil { - return err - } - // copy from client's stdin to container's stdin go func() { // if e.stdin is noninteractive, e.g. `echo abc | kubectl exec -i -- cat`, make sure @@ -109,14 +138,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error { }() } - // set up stdout stream if e.stdout != nil { - headers.Set(api.StreamType, api.StreamTypeStdout) - remoteStdout, err := conn.CreateStream(headers) - if err != nil { - return err - } - wg.Add(1) go func() { defer wg.Done() @@ -126,14 +148,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error { }() } - // set up stderr stream if e.stderr != nil && !e.tty { - headers.Set(api.StreamType, api.StreamTypeStderr) - remoteStderr, err := conn.CreateStream(headers) - if err != nil { - return err - } - wg.Add(1) go func() { defer wg.Done()