mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Create all streams before copying in exec/attach
Create error, stdin, stdout, stderr streams first, and only start copying once all the streams have been created. This fixes an issue where the client immediately starts sending data for stdin before all the other streams have been created. This ends up blocking the spdy connection frame handler and causes the entire exec/attach session to time out.
This commit is contained in:
parent
cc53619372
commit
e30b8a36d2
@ -55,12 +55,57 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
err error
|
||||
errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream
|
||||
)
|
||||
|
||||
// set up all the streams first
|
||||
headers := http.Header{}
|
||||
headers.Set(api.StreamType, api.StreamTypeError)
|
||||
errorStream, err := conn.CreateStream(headers)
|
||||
errorStream, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer errorStream.Reset()
|
||||
|
||||
// Create all the streams first, then start the copy goroutines. The server doesn't start its copy
|
||||
// goroutines until it's received all of the streams. If the client creates the stdin stream and
|
||||
// immediately begins copying stdin data to the server, it's possible to overwhelm and wedge the
|
||||
// spdy frame handler in the server so that it is full of unprocessed frames. The frames aren't
|
||||
// getting processed because the server hasn't started its copying, and it won't do that until it
|
||||
// gets all the streams. By creating all the streams first, we ensure that the server is ready to
|
||||
// process data before the client starts sending any. See https://issues.k8s.io/16373 for more info.
|
||||
if e.stdin != nil {
|
||||
headers.Set(api.StreamType, api.StreamTypeStdin)
|
||||
remoteStdin, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer remoteStdin.Reset()
|
||||
}
|
||||
|
||||
if e.stdout != nil {
|
||||
headers.Set(api.StreamType, api.StreamTypeStdout)
|
||||
remoteStdout, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer remoteStdout.Reset()
|
||||
}
|
||||
|
||||
if e.stderr != nil && !e.tty {
|
||||
headers.Set(api.StreamType, api.StreamTypeStderr)
|
||||
remoteStderr, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer remoteStderr.Reset()
|
||||
}
|
||||
|
||||
// now that all the streams have been created, proceed with reading & copying
|
||||
|
||||
// always read from errorStream
|
||||
go func() {
|
||||
message, err := ioutil.ReadAll(errorStream)
|
||||
if err != nil && err != io.EOF {
|
||||
@ -72,15 +117,8 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
|
||||
return
|
||||
}
|
||||
}()
|
||||
defer errorStream.Reset()
|
||||
|
||||
if e.stdin != nil {
|
||||
headers.Set(api.StreamType, api.StreamTypeStdin)
|
||||
remoteStdin, err := conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer remoteStdin.Reset()
|
||||
// TODO this goroutine will never exit cleanly (the io.Copy never unblocks)
|
||||
// because stdin is not closed until the process exits. If we try to call
|
||||
// stdin.Close(), it returns no error but doesn't unblock the copy. It will
|
||||
@ -93,23 +131,11 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
|
||||
|
||||
if e.stdout != nil {
|
||||
waitCount++
|
||||
headers.Set(api.StreamType, api.StreamTypeStdout)
|
||||
remoteStdout, err := conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer remoteStdout.Reset()
|
||||
go cp(api.StreamTypeStdout, e.stdout, remoteStdout)
|
||||
}
|
||||
|
||||
if e.stderr != nil && !e.tty {
|
||||
waitCount++
|
||||
headers.Set(api.StreamType, api.StreamTypeStderr)
|
||||
remoteStderr, err := conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer remoteStderr.Reset()
|
||||
go cp(api.StreamTypeStderr, e.stderr, remoteStderr)
|
||||
}
|
||||
|
||||
|
@ -42,16 +42,52 @@ type streamProtocolV2 struct {
|
||||
var _ streamProtocolHandler = &streamProtocolV2{}
|
||||
|
||||
func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
|
||||
var (
|
||||
err error
|
||||
errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream
|
||||
)
|
||||
|
||||
headers := http.Header{}
|
||||
|
||||
// set up all the streams first
|
||||
// set up error stream
|
||||
errorChan := make(chan error)
|
||||
headers.Set(api.StreamType, api.StreamTypeError)
|
||||
errorStream, err := conn.CreateStream(headers)
|
||||
errorStream, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// set up stdin stream
|
||||
if e.stdin != nil {
|
||||
headers.Set(api.StreamType, api.StreamTypeStdin)
|
||||
remoteStdin, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// set up stdout stream
|
||||
if e.stdout != nil {
|
||||
headers.Set(api.StreamType, api.StreamTypeStdout)
|
||||
remoteStdout, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// set up stderr stream
|
||||
if e.stderr != nil && !e.tty {
|
||||
headers.Set(api.StreamType, api.StreamTypeStderr)
|
||||
remoteStderr, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// now that all the streams have been created, proceed with reading & copying
|
||||
|
||||
// always read from errorStream
|
||||
go func() {
|
||||
message, err := ioutil.ReadAll(errorStream)
|
||||
switch {
|
||||
@ -68,14 +104,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
|
||||
var wg sync.WaitGroup
|
||||
var once sync.Once
|
||||
|
||||
// set up stdin stream
|
||||
if e.stdin != nil {
|
||||
headers.Set(api.StreamType, api.StreamTypeStdin)
|
||||
remoteStdin, err := conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// copy from client's stdin to container's stdin
|
||||
go func() {
|
||||
// if e.stdin is noninteractive, e.g. `echo abc | kubectl exec -i <pod> -- cat`, make sure
|
||||
@ -109,14 +138,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
|
||||
}()
|
||||
}
|
||||
|
||||
// set up stdout stream
|
||||
if e.stdout != nil {
|
||||
headers.Set(api.StreamType, api.StreamTypeStdout)
|
||||
remoteStdout, err := conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
@ -126,14 +148,7 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
|
||||
}()
|
||||
}
|
||||
|
||||
// set up stderr stream
|
||||
if e.stderr != nil && !e.tty {
|
||||
headers.Set(api.StreamType, api.StreamTypeStderr)
|
||||
remoteStderr, err := conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
Loading…
Reference in New Issue
Block a user