Merge pull request #16590 from ncdc/create-all-streams-before-copying

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot
2015-12-02 16:51:49 -08:00
2 changed files with 83 additions and 42 deletions

View File

@@ -55,12 +55,57 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
} }
} }
var (
err error
errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream
)
// set up all the streams first
headers := http.Header{} headers := http.Header{}
headers.Set(api.StreamType, api.StreamTypeError) headers.Set(api.StreamType, api.StreamTypeError)
errorStream, err := conn.CreateStream(headers) errorStream, err = conn.CreateStream(headers)
if err != nil { if err != nil {
return err return err
} }
defer errorStream.Reset()
// Create all the streams first, then start the copy goroutines. The server doesn't start its copy
// goroutines until it's received all of the streams. If the client creates the stdin stream and
// immediately begins copying stdin data to the server, it's possible to overwhelm and wedge the
// spdy frame handler in the server so that it is full of unprocessed frames. The frames aren't
// getting processed because the server hasn't started its copying, and it won't do that until it
// gets all the streams. By creating all the streams first, we ensure that the server is ready to
// process data before the client starts sending any. See https://issues.k8s.io/16373 for more info.
if e.stdin != nil {
headers.Set(api.StreamType, api.StreamTypeStdin)
remoteStdin, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStdin.Reset()
}
if e.stdout != nil {
headers.Set(api.StreamType, api.StreamTypeStdout)
remoteStdout, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStdout.Reset()
}
if e.stderr != nil && !e.tty {
headers.Set(api.StreamType, api.StreamTypeStderr)
remoteStderr, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStderr.Reset()
}
// now that all the streams have been created, proceed with reading & copying
// always read from errorStream
go func() { go func() {
message, err := ioutil.ReadAll(errorStream) message, err := ioutil.ReadAll(errorStream)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
@@ -72,15 +117,8 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
return return
} }
}() }()
defer errorStream.Reset()
if e.stdin != nil { if e.stdin != nil {
headers.Set(api.StreamType, api.StreamTypeStdin)
remoteStdin, err := conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStdin.Reset()
// TODO this goroutine will never exit cleanly (the io.Copy never unblocks) // TODO this goroutine will never exit cleanly (the io.Copy never unblocks)
// because stdin is not closed until the process exits. If we try to call // because stdin is not closed until the process exits. If we try to call
// stdin.Close(), it returns no error but doesn't unblock the copy. It will // stdin.Close(), it returns no error but doesn't unblock the copy. It will
@@ -93,23 +131,11 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
if e.stdout != nil { if e.stdout != nil {
waitCount++ waitCount++
headers.Set(api.StreamType, api.StreamTypeStdout)
remoteStdout, err := conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStdout.Reset()
go cp(api.StreamTypeStdout, e.stdout, remoteStdout) go cp(api.StreamTypeStdout, e.stdout, remoteStdout)
} }
if e.stderr != nil && !e.tty { if e.stderr != nil && !e.tty {
waitCount++ waitCount++
headers.Set(api.StreamType, api.StreamTypeStderr)
remoteStderr, err := conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStderr.Reset()
go cp(api.StreamTypeStderr, e.stderr, remoteStderr) go cp(api.StreamTypeStderr, e.stderr, remoteStderr)
} }

View File

@@ -42,16 +42,52 @@ type streamProtocolV2 struct {
var _ streamProtocolHandler = &streamProtocolV2{} var _ streamProtocolHandler = &streamProtocolV2{}
func (e *streamProtocolV2) stream(conn httpstream.Connection) error { func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
var (
err error
errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream
)
headers := http.Header{} headers := http.Header{}
// set up all the streams first
// set up error stream // set up error stream
errorChan := make(chan error) errorChan := make(chan error)
headers.Set(api.StreamType, api.StreamTypeError) headers.Set(api.StreamType, api.StreamTypeError)
errorStream, err := conn.CreateStream(headers) errorStream, err = conn.CreateStream(headers)
if err != nil { if err != nil {
return err return err
} }
// set up stdin stream
if e.stdin != nil {
headers.Set(api.StreamType, api.StreamTypeStdin)
remoteStdin, err = conn.CreateStream(headers)
if err != nil {
return err
}
}
// set up stdout stream
if e.stdout != nil {
headers.Set(api.StreamType, api.StreamTypeStdout)
remoteStdout, err = conn.CreateStream(headers)
if err != nil {
return err
}
}
// set up stderr stream
if e.stderr != nil && !e.tty {
headers.Set(api.StreamType, api.StreamTypeStderr)
remoteStderr, err = conn.CreateStream(headers)
if err != nil {
return err
}
}
// now that all the streams have been created, proceed with reading & copying
// always read from errorStream
go func() { go func() {
message, err := ioutil.ReadAll(errorStream) message, err := ioutil.ReadAll(errorStream)
switch { switch {
@@ -68,14 +104,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var once sync.Once var once sync.Once
// set up stdin stream
if e.stdin != nil { if e.stdin != nil {
headers.Set(api.StreamType, api.StreamTypeStdin)
remoteStdin, err := conn.CreateStream(headers)
if err != nil {
return err
}
// copy from client's stdin to container's stdin // copy from client's stdin to container's stdin
go func() { go func() {
// if e.stdin is noninteractive, e.g. `echo abc | kubectl exec -i <pod> -- cat`, make sure // if e.stdin is noninteractive, e.g. `echo abc | kubectl exec -i <pod> -- cat`, make sure
@@ -109,14 +138,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
}() }()
} }
// set up stdout stream
if e.stdout != nil { if e.stdout != nil {
headers.Set(api.StreamType, api.StreamTypeStdout)
remoteStdout, err := conn.CreateStream(headers)
if err != nil {
return err
}
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
@@ -126,14 +148,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
}() }()
} }
// set up stderr stream
if e.stderr != nil && !e.tty { if e.stderr != nil && !e.tty {
headers.Set(api.StreamType, api.StreamTypeStderr)
remoteStderr, err := conn.CreateStream(headers)
if err != nil {
return err
}
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()