diff --git a/pkg/client/unversioned/remotecommand/remotecommand.go b/pkg/client/unversioned/remotecommand/remotecommand.go index 21091c85250..69f150b8630 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand.go +++ b/pkg/client/unversioned/remotecommand/remotecommand.go @@ -19,16 +19,12 @@ package remotecommand import ( "fmt" "io" - "io/ioutil" "net/http" "net/url" - "sync" "github.com/golang/glog" - "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream/spdy" ) @@ -191,219 +187,3 @@ func (e *streamExecutor) Stream(stdin io.Reader, stdout, stderr io.Writer, tty b return streamer.stream(conn) } - -type streamProtocolV1 struct { - stdin io.Reader - stdout io.Writer - stderr io.Writer - tty bool -} - -func (e *streamProtocolV1) stream(conn httpstream.Connection) error { - doneChan := make(chan struct{}, 2) - errorChan := make(chan error) - - cp := func(s string, dst io.Writer, src io.Reader) { - glog.V(4).Infof("Copying %s", s) - defer glog.V(4).Infof("Done copying %s", s) - if _, err := io.Copy(dst, src); err != nil && err != io.EOF { - glog.Errorf("Error copying %s: %v", s, err) - } - if s == api.StreamTypeStdout || s == api.StreamTypeStderr { - doneChan <- struct{}{} - } - } - - headers := http.Header{} - headers.Set(api.StreamType, api.StreamTypeError) - errorStream, err := conn.CreateStream(headers) - if err != nil { - return err - } - go func() { - message, err := ioutil.ReadAll(errorStream) - if err != nil && err != io.EOF { - errorChan <- fmt.Errorf("Error reading from error stream: %s", err) - return - } - if len(message) > 0 { - errorChan <- fmt.Errorf("Error executing remote command: %s", message) - return - } - }() - defer errorStream.Reset() - - if e.stdin != nil { - headers.Set(api.StreamType, api.StreamTypeStdin) - remoteStdin, err := conn.CreateStream(headers) - if err != nil { - return err - } - defer remoteStdin.Reset() - // TODO this goroutine will never exit cleanly (the io.Copy never unblocks) - // because stdin is not closed until the process exits. If we try to call - // stdin.Close(), it returns no error but doesn't unblock the copy. It will - // exit when the process exits, instead. - go cp(api.StreamTypeStdin, remoteStdin, e.stdin) - } - - waitCount := 0 - completedStreams := 0 - - if e.stdout != nil { - waitCount++ - headers.Set(api.StreamType, api.StreamTypeStdout) - remoteStdout, err := conn.CreateStream(headers) - if err != nil { - return err - } - defer remoteStdout.Reset() - go cp(api.StreamTypeStdout, e.stdout, remoteStdout) - } - - if e.stderr != nil && !e.tty { - waitCount++ - headers.Set(api.StreamType, api.StreamTypeStderr) - remoteStderr, err := conn.CreateStream(headers) - if err != nil { - return err - } - defer remoteStderr.Reset() - go cp(api.StreamTypeStderr, e.stderr, remoteStderr) - } - -Loop: - for { - select { - case <-doneChan: - completedStreams++ - if completedStreams == waitCount { - break Loop - } - case err := <-errorChan: - return err - } - } - - return nil -} - -// streamProtocolV2 implements version 2 of the streaming protocol for attach -// and exec. The original streaming protocol was unversioned. As a result, this -// version is referred to as version 2, even though it is the first actual -// numbered version. -type streamProtocolV2 struct { - stdin io.Reader - stdout io.Writer - stderr io.Writer - tty bool -} - -func (e *streamProtocolV2) stream(conn httpstream.Connection) error { - headers := http.Header{} - - // set up error stream - errorChan := make(chan error) - headers.Set(api.StreamType, api.StreamTypeError) - errorStream, err := conn.CreateStream(headers) - if err != nil { - return err - } - - go func() { - message, err := ioutil.ReadAll(errorStream) - switch { - case err != nil && err != io.EOF: - errorChan <- fmt.Errorf("error reading from error stream: %s", err) - case len(message) > 0: - errorChan <- fmt.Errorf("error executing remote command: %s", message) - default: - errorChan <- nil - } - close(errorChan) - }() - - var wg sync.WaitGroup - var once sync.Once - - // set up stdin stream - if e.stdin != nil { - headers.Set(api.StreamType, api.StreamTypeStdin) - remoteStdin, err := conn.CreateStream(headers) - if err != nil { - return err - } - - // copy from client's stdin to container's stdin - go func() { - // if e.stdin is noninteractive, e.g. `echo abc | kubectl exec -i -- cat`, make sure - // we close remoteStdin as soon as the copy from e.stdin to remoteStdin finishes. Otherwise - // the executed command will remain running. - defer once.Do(func() { remoteStdin.Close() }) - - if _, err := io.Copy(remoteStdin, e.stdin); err != nil { - util.HandleError(err) - } - }() - - // read from remoteStdin until the stream is closed. this is essential to - // be able to exit interactive sessions cleanly and not leak goroutines or - // hang the client's terminal. - // - // go-dockerclient's current hijack implementation - // (https://github.com/fsouza/go-dockerclient/blob/89f3d56d93788dfe85f864a44f85d9738fca0670/client.go#L564) - // waits for all three streams (stdin/stdout/stderr) to finish copying - // before returning. When hijack finishes copying stdout/stderr, it calls - // Close() on its side of remoteStdin, which allows this copy to complete. - // When that happens, we must Close() on our side of remoteStdin, to - // allow the copy in hijack to complete, and hijack to return. - go func() { - defer once.Do(func() { remoteStdin.Close() }) - // this "copy" doesn't actually read anything - it's just here to wait for - // the server to close remoteStdin. - if _, err := io.Copy(ioutil.Discard, remoteStdin); err != nil { - util.HandleError(err) - } - }() - } - - // set up stdout stream - if e.stdout != nil { - headers.Set(api.StreamType, api.StreamTypeStdout) - remoteStdout, err := conn.CreateStream(headers) - if err != nil { - return err - } - - wg.Add(1) - go func() { - defer wg.Done() - if _, err := io.Copy(e.stdout, remoteStdout); err != nil { - util.HandleError(err) - } - }() - } - - // set up stderr stream - if e.stderr != nil && !e.tty { - headers.Set(api.StreamType, api.StreamTypeStderr) - remoteStderr, err := conn.CreateStream(headers) - if err != nil { - return err - } - - wg.Add(1) - go func() { - defer wg.Done() - if _, err := io.Copy(e.stderr, remoteStderr); err != nil { - util.HandleError(err) - } - }() - } - - // we're waiting for stdout/stderr to finish copying - wg.Wait() - - // waits for errorStream to finish reading with an error or nil - return <-errorChan -} diff --git a/pkg/client/unversioned/remotecommand/v1.go b/pkg/client/unversioned/remotecommand/v1.go new file mode 100644 index 00000000000..1a64ed048cc --- /dev/null +++ b/pkg/client/unversioned/remotecommand/v1.go @@ -0,0 +1,126 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remotecommand + +import ( + "fmt" + "io" + "io/ioutil" + "net/http" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/httpstream" +) + +type streamProtocolV1 struct { + stdin io.Reader + stdout io.Writer + stderr io.Writer + tty bool +} + +var _ streamProtocolHandler = &streamProtocolV1{} + +func (e *streamProtocolV1) stream(conn httpstream.Connection) error { + doneChan := make(chan struct{}, 2) + errorChan := make(chan error) + + cp := func(s string, dst io.Writer, src io.Reader) { + glog.V(4).Infof("Copying %s", s) + defer glog.V(4).Infof("Done copying %s", s) + if _, err := io.Copy(dst, src); err != nil && err != io.EOF { + glog.Errorf("Error copying %s: %v", s, err) + } + if s == api.StreamTypeStdout || s == api.StreamTypeStderr { + doneChan <- struct{}{} + } + } + + headers := http.Header{} + headers.Set(api.StreamType, api.StreamTypeError) + errorStream, err := conn.CreateStream(headers) + if err != nil { + return err + } + go func() { + message, err := ioutil.ReadAll(errorStream) + if err != nil && err != io.EOF { + errorChan <- fmt.Errorf("Error reading from error stream: %s", err) + return + } + if len(message) > 0 { + errorChan <- fmt.Errorf("Error executing remote command: %s", message) + return + } + }() + defer errorStream.Reset() + + if e.stdin != nil { + headers.Set(api.StreamType, api.StreamTypeStdin) + remoteStdin, err := conn.CreateStream(headers) + if err != nil { + return err + } + defer remoteStdin.Reset() + // TODO this goroutine will never exit cleanly (the io.Copy never unblocks) + // because stdin is not closed until the process exits. If we try to call + // stdin.Close(), it returns no error but doesn't unblock the copy. It will + // exit when the process exits, instead. + go cp(api.StreamTypeStdin, remoteStdin, e.stdin) + } + + waitCount := 0 + completedStreams := 0 + + if e.stdout != nil { + waitCount++ + headers.Set(api.StreamType, api.StreamTypeStdout) + remoteStdout, err := conn.CreateStream(headers) + if err != nil { + return err + } + defer remoteStdout.Reset() + go cp(api.StreamTypeStdout, e.stdout, remoteStdout) + } + + if e.stderr != nil && !e.tty { + waitCount++ + headers.Set(api.StreamType, api.StreamTypeStderr) + remoteStderr, err := conn.CreateStream(headers) + if err != nil { + return err + } + defer remoteStderr.Reset() + go cp(api.StreamTypeStderr, e.stderr, remoteStderr) + } + +Loop: + for { + select { + case <-doneChan: + completedStreams++ + if completedStreams == waitCount { + break Loop + } + case err := <-errorChan: + return err + } + } + + return nil +} diff --git a/pkg/client/unversioned/remotecommand/v2.go b/pkg/client/unversioned/remotecommand/v2.go new file mode 100644 index 00000000000..ca10dda4956 --- /dev/null +++ b/pkg/client/unversioned/remotecommand/v2.go @@ -0,0 +1,151 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remotecommand + +import ( + "fmt" + "io" + "io/ioutil" + "net/http" + "sync" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/httpstream" +) + +// streamProtocolV2 implements version 2 of the streaming protocol for attach +// and exec. The original streaming protocol was unversioned. As a result, this +// version is referred to as version 2, even though it is the first actual +// numbered version. +type streamProtocolV2 struct { + stdin io.Reader + stdout io.Writer + stderr io.Writer + tty bool +} + +var _ streamProtocolHandler = &streamProtocolV2{} + +func (e *streamProtocolV2) stream(conn httpstream.Connection) error { + headers := http.Header{} + + // set up error stream + errorChan := make(chan error) + headers.Set(api.StreamType, api.StreamTypeError) + errorStream, err := conn.CreateStream(headers) + if err != nil { + return err + } + + go func() { + message, err := ioutil.ReadAll(errorStream) + switch { + case err != nil && err != io.EOF: + errorChan <- fmt.Errorf("error reading from error stream: %s", err) + case len(message) > 0: + errorChan <- fmt.Errorf("error executing remote command: %s", message) + default: + errorChan <- nil + } + close(errorChan) + }() + + var wg sync.WaitGroup + var once sync.Once + + // set up stdin stream + if e.stdin != nil { + headers.Set(api.StreamType, api.StreamTypeStdin) + remoteStdin, err := conn.CreateStream(headers) + if err != nil { + return err + } + + // copy from client's stdin to container's stdin + go func() { + // if e.stdin is noninteractive, e.g. `echo abc | kubectl exec -i -- cat`, make sure + // we close remoteStdin as soon as the copy from e.stdin to remoteStdin finishes. Otherwise + // the executed command will remain running. + defer once.Do(func() { remoteStdin.Close() }) + + if _, err := io.Copy(remoteStdin, e.stdin); err != nil { + util.HandleError(err) + } + }() + + // read from remoteStdin until the stream is closed. this is essential to + // be able to exit interactive sessions cleanly and not leak goroutines or + // hang the client's terminal. + // + // go-dockerclient's current hijack implementation + // (https://github.com/fsouza/go-dockerclient/blob/89f3d56d93788dfe85f864a44f85d9738fca0670/client.go#L564) + // waits for all three streams (stdin/stdout/stderr) to finish copying + // before returning. When hijack finishes copying stdout/stderr, it calls + // Close() on its side of remoteStdin, which allows this copy to complete. + // When that happens, we must Close() on our side of remoteStdin, to + // allow the copy in hijack to complete, and hijack to return. + go func() { + defer once.Do(func() { remoteStdin.Close() }) + // this "copy" doesn't actually read anything - it's just here to wait for + // the server to close remoteStdin. + if _, err := io.Copy(ioutil.Discard, remoteStdin); err != nil { + util.HandleError(err) + } + }() + } + + // set up stdout stream + if e.stdout != nil { + headers.Set(api.StreamType, api.StreamTypeStdout) + remoteStdout, err := conn.CreateStream(headers) + if err != nil { + return err + } + + wg.Add(1) + go func() { + defer wg.Done() + if _, err := io.Copy(e.stdout, remoteStdout); err != nil { + util.HandleError(err) + } + }() + } + + // set up stderr stream + if e.stderr != nil && !e.tty { + headers.Set(api.StreamType, api.StreamTypeStderr) + remoteStderr, err := conn.CreateStream(headers) + if err != nil { + return err + } + + wg.Add(1) + go func() { + defer wg.Done() + if _, err := io.Copy(e.stderr, remoteStderr); err != nil { + util.HandleError(err) + } + }() + } + + // we're waiting for stdout/stderr to finish copying + wg.Wait() + + // waits for errorStream to finish reading with an error or nil + return <-errorChan +}