From ce7f003f57f6938c6f513e2880ed23f35f1160db Mon Sep 17 00:00:00 2001 From: bindata-mockuser Date: Thu, 4 Aug 2016 18:39:12 +0200 Subject: [PATCH 1/3] Add protocol versions to pkg/util/wsstream --- pkg/apiserver/apiserver.go | 2 +- pkg/util/wsstream/conn.go | 99 +++++++++++++++++---------- pkg/util/wsstream/conn_test.go | 113 +++++++++++++++++++++++++++++-- pkg/util/wsstream/stream.go | 55 +++++++++++---- pkg/util/wsstream/stream_test.go | 113 +++++++++++++++++++++++-------- 5 files changed, 302 insertions(+), 80 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 1d4b07c1680..54839a058d5 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -451,7 +451,7 @@ func write(statusCode int, gv unversioned.GroupVersion, s runtime.NegotiatedSeri defer out.Close() if wsstream.IsWebSocketRequest(req) { - r := wsstream.NewReader(out, true) + r := wsstream.NewReader(out, true, wsstream.NewDefaultReaderProtocols()) if err := r.Copy(w, req); err != nil { utilruntime.HandleError(fmt.Errorf("error encountered while streaming results via websocket: %v", err)) } diff --git a/pkg/util/wsstream/conn.go b/pkg/util/wsstream/conn.go index eb0b4b28bb1..94b75568eec 100644 --- a/pkg/util/wsstream/conn.go +++ b/pkg/util/wsstream/conn.go @@ -27,6 +27,7 @@ import ( "github.com/golang/glog" "golang.org/x/net/websocket" + "k8s.io/kubernetes/pkg/util/runtime" ) @@ -44,7 +45,7 @@ import ( // READ []byte{1, 10} # receive "\n" on channel 1 (STDOUT) // CLOSE // -const channelWebSocketProtocol = "channel.k8s.io" +const ChannelWebSocketProtocol = "channel.k8s.io" // The Websocket subprotocol "base64.channel.k8s.io" base64 encodes each message with a character // indicating the channel number (zero indexed) the message was sent on. Messages in both directions @@ -60,7 +61,7 @@ const channelWebSocketProtocol = "channel.k8s.io" // READ []byte{49, 67, 103, 61, 61} # receive "\n" (base64: "Cg==") on channel '1' (STDOUT) // CLOSE // -const base64ChannelWebSocketProtocol = "base64.channel.k8s.io" +const Base64ChannelWebSocketProtocol = "base64.channel.k8s.io" type codecType int @@ -107,8 +108,9 @@ func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) { func handshake(config *websocket.Config, req *http.Request, allowed []string) error { protocols := config.Protocol if len(protocols) == 0 { - return nil + protocols = []string{""} } + for _, protocol := range protocols { for _, allow := range allowed { if allow == protocol { @@ -117,41 +119,50 @@ func handshake(config *websocket.Config, req *http.Request, allowed []string) er } } } + return fmt.Errorf("requested protocol(s) are not supported: %v; supports %v", config.Protocol, allowed) } +// ChannelProtocolConfig describes a websocket subprotocol with channels. +type ChannelProtocolConfig struct { + Binary bool + Channels []ChannelType +} + +// NewDefaultChannelProtocols returns a channel protocol map with the +// subprotocols "", "channel.k8s.io", "base64.channel.k8s.io" and the given +// channels. +func NewDefaultChannelProtocols(channels []ChannelType) map[string]ChannelProtocolConfig { + return map[string]ChannelProtocolConfig{ + "": {Binary: true, Channels: channels}, + ChannelWebSocketProtocol: {Binary: true, Channels: channels}, + Base64ChannelWebSocketProtocol: {Binary: false, Channels: channels}, + } +} + // Conn supports sending multiple binary channels over a websocket connection. -// Supports only the "channel.k8s.io" subprotocol. type Conn struct { - channels []*websocketChannel - codec codecType - ready chan struct{} - ws *websocket.Conn - timeout time.Duration + protocols map[string]ChannelProtocolConfig + selectedProtocol string + channels []*websocketChannel + codec codecType + ready chan struct{} + ws *websocket.Conn + timeout time.Duration } // NewConn creates a WebSocket connection that supports a set of channels. Channels begin each // web socket message with a single byte indicating the channel number (0-N). 255 is reserved for // future use. The channel types for each channel are passed as an array, supporting the different // duplex modes. Read and Write refer to whether the channel can be used as a Reader or Writer. -func NewConn(channels ...ChannelType) *Conn { - conn := &Conn{ - ready: make(chan struct{}), - channels: make([]*websocketChannel, len(channels)), +// +// The protocols parameter maps subprotocol names to ChannelProtocols. The empty string subprotocol +// name is used if websocket.Config.Protocol is empty. +func NewConn(protocols map[string]ChannelProtocolConfig) *Conn { + return &Conn{ + ready: make(chan struct{}), + protocols: protocols, } - for i := range conn.channels { - switch channels[i] { - case ReadChannel: - conn.channels[i] = newWebsocketChannel(conn, byte(i), true, false) - case WriteChannel: - conn.channels[i] = newWebsocketChannel(conn, byte(i), false, true) - case ReadWriteChannel: - conn.channels[i] = newWebsocketChannel(conn, byte(i), true, true) - case IgnoreChannel: - conn.channels[i] = newWebsocketChannel(conn, byte(i), false, false) - } - } - return conn } // SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified, @@ -160,8 +171,9 @@ func (conn *Conn) SetIdleTimeout(duration time.Duration) { conn.timeout = duration } -// Open the connection and create channels for reading and writing. -func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) ([]io.ReadWriteCloser, error) { +// Open the connection and create channels for reading and writing. It returns +// the selected subprotocol, a slice of channels and an error. +func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.ReadWriteCloser, error) { go func() { defer runtime.HandleCrash() defer conn.Close() @@ -172,23 +184,42 @@ func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) ([]io.ReadWrite for i := range conn.channels { rwc[i] = conn.channels[i] } - return rwc, nil + return conn.selectedProtocol, rwc, nil } func (conn *Conn) initialize(ws *websocket.Conn) { - protocols := ws.Config().Protocol - switch { - case len(protocols) == 0, protocols[0] == channelWebSocketProtocol: + negotiated := ws.Config().Protocol + conn.selectedProtocol = negotiated[0] + p := conn.protocols[conn.selectedProtocol] + if p.Binary { conn.codec = rawCodec - case protocols[0] == base64ChannelWebSocketProtocol: + } else { conn.codec = base64Codec } conn.ws = ws + conn.channels = make([]*websocketChannel, len(p.Channels)) + for i, t := range p.Channels { + switch t { + case ReadChannel: + conn.channels[i] = newWebsocketChannel(conn, byte(i), true, false) + case WriteChannel: + conn.channels[i] = newWebsocketChannel(conn, byte(i), false, true) + case ReadWriteChannel: + conn.channels[i] = newWebsocketChannel(conn, byte(i), true, true) + case IgnoreChannel: + conn.channels[i] = newWebsocketChannel(conn, byte(i), false, false) + } + } + close(conn.ready) } func (conn *Conn) handshake(config *websocket.Config, req *http.Request) error { - return handshake(config, req, []string{channelWebSocketProtocol, base64ChannelWebSocketProtocol}) + supportedProtocols := make([]string, 0, len(conn.protocols)) + for p := range conn.protocols { + supportedProtocols = append(supportedProtocols, p) + } + return handshake(config, req, supportedProtocols) } func (conn *Conn) resetTimeout() { diff --git a/pkg/util/wsstream/conn_test.go b/pkg/util/wsstream/conn_test.go index 88c84bf162f..1c049aad7a2 100644 --- a/pkg/util/wsstream/conn_test.go +++ b/pkg/util/wsstream/conn_test.go @@ -20,6 +20,7 @@ import ( "encoding/base64" "io" "io/ioutil" + "net/http" "net/http/httptest" "reflect" "sync" @@ -28,15 +29,19 @@ import ( "golang.org/x/net/websocket" ) -func newServer(handler websocket.Handler) (*httptest.Server, string) { +func newServer(handler http.Handler) (*httptest.Server, string) { server := httptest.NewServer(handler) serverAddr := server.Listener.Addr().String() return server, serverAddr } func TestRawConn(t *testing.T) { - conn := NewConn(ReadWriteChannel, ReadWriteChannel, IgnoreChannel, ReadChannel, WriteChannel) - s, addr := newServer(conn.handle) + channels := []ChannelType{ReadWriteChannel, ReadWriteChannel, IgnoreChannel, ReadChannel, WriteChannel} + conn := NewConn(NewDefaultChannelProtocols(channels)) + + s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + conn.Open(w, req) + })) defer s.Close() client, err := websocket.Dial("ws://"+addr, "", "http://localhost/") @@ -112,8 +117,10 @@ func TestRawConn(t *testing.T) { } func TestBase64Conn(t *testing.T) { - conn := NewConn(ReadWriteChannel, ReadWriteChannel) - s, addr := newServer(conn.handle) + conn := NewConn(NewDefaultChannelProtocols([]ChannelType{ReadWriteChannel, ReadWriteChannel})) + s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + conn.Open(w, req) + })) defer s.Close() config, err := websocket.NewConfig("ws://"+addr, "http://localhost/") @@ -167,3 +174,99 @@ func TestBase64Conn(t *testing.T) { client.Close() wg.Wait() } + +type versionTest struct { + supported map[string]bool // protocol -> binary + requested []string + error bool + expected string +} + +func versionTests() []versionTest { + const ( + binary = true + base64 = false + ) + return []versionTest{ + { + supported: nil, + requested: []string{"raw"}, + error: true, + }, + { + supported: map[string]bool{"": binary, "raw": binary, "base64": base64}, + requested: nil, + expected: "", + }, + { + supported: map[string]bool{"": binary, "raw": binary, "base64": base64}, + requested: []string{"v1.raw"}, + error: true, + }, + { + supported: map[string]bool{"": binary, "raw": binary, "base64": base64}, + requested: []string{"v1.raw", "v1.base64"}, + error: true, + }, { + supported: map[string]bool{"": binary, "raw": binary, "base64": base64}, + requested: []string{"v1.raw", "raw"}, + expected: "raw", + }, + { + supported: map[string]bool{"": binary, "v1.raw": binary, "v1.base64": base64, "v2.raw": binary, "v2.base64": base64}, + requested: []string{"v1.raw"}, + expected: "v1.raw", + }, + { + supported: map[string]bool{"": binary, "v1.raw": binary, "v1.base64": base64, "v2.raw": binary, "v2.base64": base64}, + requested: []string{"v2.base64"}, + expected: "v2.base64", + }, + } +} + +func TestVersionedConn(t *testing.T) { + for i, test := range versionTests() { + func() { + supportedProtocols := map[string]ChannelProtocolConfig{} + for p, binary := range test.supported { + supportedProtocols[p] = ChannelProtocolConfig{ + Binary: binary, + Channels: []ChannelType{ReadWriteChannel}, + } + } + conn := NewConn(supportedProtocols) + // note that it's not enough to wait for conn.ready to avoid a race here. Hence, + // we use a channel. + selectedProtocol := make(chan string, 0) + s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + p, _, _ := conn.Open(w, req) + selectedProtocol <- p + })) + defer s.Close() + + config, err := websocket.NewConfig("ws://"+addr, "http://localhost/") + if err != nil { + t.Fatal(err) + } + config.Protocol = test.requested + client, err := websocket.DialConfig(config) + if err != nil { + if !test.error { + t.Fatalf("test %d: didn't expect error: %v", i, err) + } else { + return + } + } + defer client.Close() + if test.error && err == nil { + t.Fatalf("test %d: expected an error", i) + } + + <-conn.ready + if got, expected := <-selectedProtocol, test.expected; got != expected { + t.Fatalf("test %d: unexpected protocol version: got=%s expected=%s", i, got, expected) + } + }() + } +} diff --git a/pkg/util/wsstream/stream.go b/pkg/util/wsstream/stream.go index d16d99817e3..2a8326cd8ab 100644 --- a/pkg/util/wsstream/stream.go +++ b/pkg/util/wsstream/stream.go @@ -23,6 +23,7 @@ import ( "time" "golang.org/x/net/websocket" + "k8s.io/kubernetes/pkg/util/runtime" ) @@ -37,23 +38,46 @@ const binaryWebSocketProtocol = "binary.k8s.io" // possible. const base64BinaryWebSocketProtocol = "base64.binary.k8s.io" +// ReaderProtocolConfig describes a websocket subprotocol with one stream. +type ReaderProtocolConfig struct { + Binary bool +} + +// NewDefaultReaderProtocols returns a stream protocol map with the +// subprotocols "", "channel.k8s.io", "base64.channel.k8s.io". +func NewDefaultReaderProtocols() map[string]ReaderProtocolConfig { + return map[string]ReaderProtocolConfig{ + "": {Binary: true}, + binaryWebSocketProtocol: {Binary: true}, + base64BinaryWebSocketProtocol: {Binary: false}, + } +} + // Reader supports returning an arbitrary byte stream over a websocket channel. -// Supports the "binary.k8s.io" and "base64.binary.k8s.io" subprotocols. type Reader struct { - err chan error - r io.Reader - ping bool - timeout time.Duration + err chan error + r io.Reader + ping bool + timeout time.Duration + protocols map[string]ReaderProtocolConfig + selectedProtocol string + + handleCrash func() // overridable for testing } // NewReader creates a WebSocket pipe that will copy the contents of r to a provided // WebSocket connection. If ping is true, a zero length message will be sent to the client // before the stream begins reading. -func NewReader(r io.Reader, ping bool) *Reader { +// +// The protocols parameter maps subprotocol names to StreamProtocols. The empty string +// subprotocol name is used if websocket.Config.Protocol is empty. +func NewReader(r io.Reader, ping bool, protocols map[string]ReaderProtocolConfig) *Reader { return &Reader{ - r: r, - err: make(chan error), - ping: ping, + r: r, + err: make(chan error), + ping: ping, + protocols: protocols, + handleCrash: func() { runtime.HandleCrash() }, } } @@ -64,14 +88,18 @@ func (r *Reader) SetIdleTimeout(duration time.Duration) { } func (r *Reader) handshake(config *websocket.Config, req *http.Request) error { - return handshake(config, req, []string{binaryWebSocketProtocol, base64BinaryWebSocketProtocol}) + supportedProtocols := make([]string, 0, len(r.protocols)) + for p := range r.protocols { + supportedProtocols = append(supportedProtocols, p) + } + return handshake(config, req, supportedProtocols) } // Copy the reader to the response. The created WebSocket is closed after this // method completes. func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error { go func() { - defer runtime.HandleCrash() + defer r.handleCrash() websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req) }() return <-r.err @@ -79,11 +107,12 @@ func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error { // handle implements a WebSocket handler. func (r *Reader) handle(ws *websocket.Conn) { - encode := len(ws.Config().Protocol) > 0 && ws.Config().Protocol[0] == base64BinaryWebSocketProtocol + negotiated := ws.Config().Protocol + r.selectedProtocol = negotiated[0] defer close(r.err) defer ws.Close() go IgnoreReceives(ws, r.timeout) - r.err <- messageCopy(ws, r.r, encode, r.ping, r.timeout) + r.err <- messageCopy(ws, r.r, !r.protocols[r.selectedProtocol].Binary, r.ping, r.timeout) } func resetTimeout(ws *websocket.Conn, timeout time.Duration) { diff --git a/pkg/util/wsstream/stream_test.go b/pkg/util/wsstream/stream_test.go index aa37b2ddfe5..09dda761f8c 100644 --- a/pkg/util/wsstream/stream_test.go +++ b/pkg/util/wsstream/stream_test.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "io/ioutil" + "net/http" "reflect" "strings" "testing" @@ -32,7 +33,7 @@ import ( func TestStream(t *testing.T) { input := "some random text" - r := NewReader(bytes.NewBuffer([]byte(input)), true) + r := NewReader(bytes.NewBuffer([]byte(input)), true, NewDefaultReaderProtocols()) r.SetIdleTimeout(time.Second) data, err := readWebSocket(r, t, nil) if !reflect.DeepEqual(data, []byte(input)) { @@ -45,7 +46,7 @@ func TestStream(t *testing.T) { func TestStreamPing(t *testing.T) { input := "some random text" - r := NewReader(bytes.NewBuffer([]byte(input)), true) + r := NewReader(bytes.NewBuffer([]byte(input)), true, NewDefaultReaderProtocols()) r.SetIdleTimeout(time.Second) err := expectWebSocketFrames(r, t, nil, [][]byte{ {}, @@ -59,8 +60,8 @@ func TestStreamPing(t *testing.T) { func TestStreamBase64(t *testing.T) { input := "some random text" encoded := base64.StdEncoding.EncodeToString([]byte(input)) - r := NewReader(bytes.NewBuffer([]byte(input)), true) - data, err := readWebSocket(r, t, nil, base64BinaryWebSocketProtocol) + r := NewReader(bytes.NewBuffer([]byte(input)), true, NewDefaultReaderProtocols()) + data, err := readWebSocket(r, t, nil, "base64.binary.k8s.io") if !reflect.DeepEqual(data, []byte(encoded)) { t.Errorf("unexpected server read: %v\n%v", data, []byte(encoded)) } @@ -69,6 +70,73 @@ func TestStreamBase64(t *testing.T) { } } +func TestStreamVersionedBase64(t *testing.T) { + input := "some random text" + encoded := base64.StdEncoding.EncodeToString([]byte(input)) + r := NewReader(bytes.NewBuffer([]byte(input)), true, map[string]ReaderProtocolConfig{ + "": {Binary: true}, + "binary.k8s.io": {Binary: true}, + "base64.binary.k8s.io": {Binary: false}, + "v1.binary.k8s.io": {Binary: true}, + "v1.base64.binary.k8s.io": {Binary: false}, + "v2.binary.k8s.io": {Binary: true}, + "v2.base64.binary.k8s.io": {Binary: false}, + }) + data, err := readWebSocket(r, t, nil, "v2.base64.binary.k8s.io") + if !reflect.DeepEqual(data, []byte(encoded)) { + t.Errorf("unexpected server read: %v\n%v", data, []byte(encoded)) + } + if err != nil { + t.Fatal(err) + } +} + +func TestStreamVersionedCopy(t *testing.T) { + for i, test := range versionTests() { + func() { + supportedProtocols := map[string]ReaderProtocolConfig{} + for p, binary := range test.supported { + supportedProtocols[p] = ReaderProtocolConfig{ + Binary: binary, + } + } + input := "some random text" + r := NewReader(bytes.NewBuffer([]byte(input)), true, supportedProtocols) + s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + err := r.Copy(w, req) + if err != nil { + w.WriteHeader(503) + } + })) + defer s.Close() + + config, err := websocket.NewConfig("ws://"+addr, "http://localhost/") + if err != nil { + t.Error(err) + return + } + config.Protocol = test.requested + client, err := websocket.DialConfig(config) + if err != nil { + if !test.error { + t.Errorf("test %d: didn't expect error: %v", i, err) + } + return + } + defer client.Close() + if test.error && err == nil { + t.Errorf("test %d: expected an error", i) + return + } + + <-r.err + if got, expected := r.selectedProtocol, test.expected; got != expected { + t.Errorf("test %d: unexpected protocol version: got=%s expected=%s", i, got, expected) + } + }() + } +} + func TestStreamError(t *testing.T) { input := "some random text" errs := &errorReader{ @@ -78,7 +146,7 @@ func TestStreamError(t *testing.T) { }, err: fmt.Errorf("bad read"), } - r := NewReader(errs, false) + r := NewReader(errs, false, NewDefaultReaderProtocols()) data, err := readWebSocket(r, t, nil) if !reflect.DeepEqual(data, []byte(input)) { @@ -98,7 +166,10 @@ func TestStreamSurvivesPanic(t *testing.T) { }, panicMessage: "bad read", } - r := NewReader(errs, false) + r := NewReader(errs, false, NewDefaultReaderProtocols()) + + // do not call runtime.HandleCrash() in handler. Otherwise, the tests are interrupted. + r.handleCrash = func() { recover() } data, err := readWebSocket(r, t, nil) if !reflect.DeepEqual(data, []byte(input)) { @@ -121,7 +192,7 @@ func TestStreamClosedDuringRead(t *testing.T) { err: fmt.Errorf("stuff"), pause: ch, } - r := NewReader(errs, false) + r := NewReader(errs, false, NewDefaultReaderProtocols()) data, err := readWebSocket(r, t, func(c *websocket.Conn) { c.Close() @@ -163,19 +234,13 @@ func (r *errorReader) Read(p []byte) (int, error) { func readWebSocket(r *Reader, t *testing.T, fn func(*websocket.Conn), protocols ...string) ([]byte, error) { errCh := make(chan error, 1) - s, addr := newServer(func(ws *websocket.Conn) { - cfg := ws.Config() - cfg.Protocol = protocols - go IgnoreReceives(ws, 0) - go func() { - err := <-r.err - errCh <- err - }() - r.handle(ws) - }) + s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + errCh <- r.Copy(w, req) + })) defer s.Close() config, _ := websocket.NewConfig("ws://"+addr, "http://"+addr) + config.Protocol = protocols client, err := websocket.DialConfig(config) if err != nil { return nil, err @@ -195,19 +260,13 @@ func readWebSocket(r *Reader, t *testing.T, fn func(*websocket.Conn), protocols func expectWebSocketFrames(r *Reader, t *testing.T, fn func(*websocket.Conn), frames [][]byte, protocols ...string) error { errCh := make(chan error, 1) - s, addr := newServer(func(ws *websocket.Conn) { - cfg := ws.Config() - cfg.Protocol = protocols - go IgnoreReceives(ws, 0) - go func() { - err := <-r.err - errCh <- err - }() - r.handle(ws) - }) + s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + errCh <- r.Copy(w, req) + })) defer s.Close() config, _ := websocket.NewConfig("ws://"+addr, "http://"+addr) + config.Protocol = protocols ws, err := websocket.DialConfig(config) if err != nil { return err From 6dcb0c913083d5b46a2eb1a88d5c468e6b64722d Mon Sep 17 00:00:00 2001 From: bindata-mockuser Date: Mon, 8 Aug 2016 20:24:01 +0200 Subject: [PATCH 2/3] Rectify kubectl error output --- pkg/kubectl/cmd/cmd_test.go | 4 +- pkg/kubectl/cmd/drain_test.go | 4 +- pkg/kubectl/cmd/exec.go | 2 +- pkg/kubectl/cmd/taint_test.go | 2 +- pkg/kubectl/cmd/util/helpers.go | 105 ++++++++++++++------------- pkg/kubectl/cmd/util/helpers_test.go | 77 ++++++++------------ pkg/util/exec/exec.go | 10 ++- 7 files changed, 97 insertions(+), 107 deletions(-) diff --git a/pkg/kubectl/cmd/cmd_test.go b/pkg/kubectl/cmd/cmd_test.go index f0ecbfbbfdf..0d375432298 100644 --- a/pkg/kubectl/cmd/cmd_test.go +++ b/pkg/kubectl/cmd/cmd_test.go @@ -46,8 +46,8 @@ import ( ) func initTestErrorHandler(t *testing.T) { - cmdutil.BehaviorOnFatal(func(str string) { - t.Errorf("Error running command: %s", str) + cmdutil.BehaviorOnFatal(func(str string, code int) { + t.Errorf("Error running command (exit code %d): %s", code, str) }) } diff --git a/pkg/kubectl/cmd/drain_test.go b/pkg/kubectl/cmd/drain_test.go index a706de18c7f..a1261b689a1 100644 --- a/pkg/kubectl/cmd/drain_test.go +++ b/pkg/kubectl/cmd/drain_test.go @@ -177,7 +177,7 @@ func TestCordon(t *testing.T) { // Restore cmdutil behavior cmdutil.DefaultBehaviorOnFatal() }() - cmdutil.BehaviorOnFatal(func(e string) { saw_fatal = true; panic(e) }) + cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) }) cmd.SetArgs([]string{test.arg}) cmd.Execute() }() @@ -521,7 +521,7 @@ func TestDrain(t *testing.T) { // Restore cmdutil behavior cmdutil.DefaultBehaviorOnFatal() }() - cmdutil.BehaviorOnFatal(func(e string) { saw_fatal = true; panic(e) }) + cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) }) cmd.SetArgs(test.args) cmd.Execute() }() diff --git a/pkg/kubectl/cmd/exec.go b/pkg/kubectl/cmd/exec.go index bbfbf05b68a..b8dee1e5024 100644 --- a/pkg/kubectl/cmd/exec.go +++ b/pkg/kubectl/cmd/exec.go @@ -39,7 +39,7 @@ var ( exec_example = dedent.Dedent(` # Get output from running 'date' from pod 123456-7890, using the first container by default kubectl exec 123456-7890 date - + # Get output from running 'date' in ruby-container from pod 123456-7890 kubectl exec 123456-7890 -c ruby-container date diff --git a/pkg/kubectl/cmd/taint_test.go b/pkg/kubectl/cmd/taint_test.go index 0c9427b3823..b8087fd0f60 100644 --- a/pkg/kubectl/cmd/taint_test.go +++ b/pkg/kubectl/cmd/taint_test.go @@ -296,7 +296,7 @@ func TestTaint(t *testing.T) { // Restore cmdutil behavior cmdutil.DefaultBehaviorOnFatal() }() - cmdutil.BehaviorOnFatal(func(e string) { saw_fatal = true; panic(e) }) + cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) }) cmd.SetArgs(test.args) cmd.Execute() }() diff --git a/pkg/kubectl/cmd/util/helpers.go b/pkg/kubectl/cmd/util/helpers.go index 3419baab3e3..7208e2f98ae 100644 --- a/pkg/kubectl/cmd/util/helpers.go +++ b/pkg/kubectl/cmd/util/helpers.go @@ -50,6 +50,7 @@ import ( const ( ApplyAnnotationsFlag = "save-config" + DefaultErrorExitCode = 1 ) type debugError interface { @@ -74,9 +75,9 @@ func AddSourceToErr(verb string, source string, err error) error { var fatalErrHandler = fatal // BehaviorOnFatal allows you to override the default behavior when a fatal -// error occurs, which is call os.Exit(1). You can pass 'panic' as a function +// error occurs, which is to call os.Exit(code). You can pass 'panic' as a function // here if you prefer the panic() over os.Exit(1). -func BehaviorOnFatal(f func(string)) { +func BehaviorOnFatal(f func(string, int)) { fatalErrHandler = f } @@ -86,19 +87,21 @@ func DefaultBehaviorOnFatal() { fatalErrHandler = fatal } -// fatal prints the message and then exits. If V(2) or greater, glog.Fatal +// fatal prints the message if set and then exits. If V(2) or greater, glog.Fatal // is invoked for extended information. -func fatal(msg string) { - // add newline if needed - if !strings.HasSuffix(msg, "\n") { - msg += "\n" - } +func fatal(msg string, code int) { + if len(msg) > 0 { + // add newline if needed + if !strings.HasSuffix(msg, "\n") { + msg += "\n" + } - if glog.V(2) { - glog.FatalDepth(2, msg) + if glog.V(2) { + glog.FatalDepth(2, msg) + } + fmt.Fprint(os.Stderr, msg) } - fmt.Fprint(os.Stderr, msg) - os.Exit(1) + os.Exit(code) } // CheckErr prints a user friendly error to STDERR and exits with a non-zero @@ -115,51 +118,49 @@ func checkErrWithPrefix(prefix string, err error) { checkErr(prefix, err, fatalErrHandler) } -func checkErr(pref string, err error, handleErr func(string)) { - if err == nil { +// checkErr formats a given error as a string and calls the passed handleErr +// func with that string and an kubectl exit code. +func checkErr(prefix string, err error, handleErr func(string, int)) { + switch { + case err == nil: return - } - - if kerrors.IsInvalid(err) { + case kerrors.IsInvalid(err): details := err.(*kerrors.StatusError).Status().Details - prefix := fmt.Sprintf("%sThe %s %q is invalid.\n", pref, details.Kind, details.Name) - errs := statusCausesToAggrError(details.Causes) - handleErr(MultilineError(prefix, errs)) - return - } - - if noMatch, ok := err.(*meta.NoResourceMatchError); ok { - switch { - case len(noMatch.PartialResource.Group) > 0 && len(noMatch.PartialResource.Version) > 0: - handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q in group %q and version %q", pref, noMatch.PartialResource.Resource, noMatch.PartialResource.Group, noMatch.PartialResource.Version)) - case len(noMatch.PartialResource.Group) > 0: - handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q in group %q", pref, noMatch.PartialResource.Resource, noMatch.PartialResource.Group)) - case len(noMatch.PartialResource.Version) > 0: - handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q in version %q", pref, noMatch.PartialResource.Resource, noMatch.PartialResource.Version)) - default: - handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q", pref, noMatch.PartialResource.Resource)) + s := fmt.Sprintf("%sThe %s %q is invalid", prefix, details.Kind, details.Name) + if len(details.Causes) > 0 { + errs := statusCausesToAggrError(details.Causes) + handleErr(MultilineError(s+": ", errs), DefaultErrorExitCode) + } else { + handleErr(s, DefaultErrorExitCode) } - return - } - - // handle multiline errors - if clientcmd.IsConfigurationInvalid(err) { - handleErr(MultilineError(fmt.Sprintf("%sError in configuration: ", pref), err)) - return - } - if agg, ok := err.(utilerrors.Aggregate); ok && len(agg.Errors()) > 0 { - handleErr(MultipleErrors(pref, agg.Errors())) - return - } - - msg, ok := StandardErrorMessage(err) - if !ok { - msg = err.Error() - if !strings.HasPrefix(msg, "error: ") { - msg = fmt.Sprintf("error: %s", msg) + case clientcmd.IsConfigurationInvalid(err): + handleErr(MultilineError(fmt.Sprintf("%sError in configuration: ", prefix), err), DefaultErrorExitCode) + default: + switch err := err.(type) { + case *meta.NoResourceMatchError: + switch { + case len(err.PartialResource.Group) > 0 && len(err.PartialResource.Version) > 0: + handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q in group %q and version %q", prefix, err.PartialResource.Resource, err.PartialResource.Group, err.PartialResource.Version), DefaultErrorExitCode) + case len(err.PartialResource.Group) > 0: + handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q in group %q", prefix, err.PartialResource.Resource, err.PartialResource.Group), DefaultErrorExitCode) + case len(err.PartialResource.Version) > 0: + handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q in version %q", prefix, err.PartialResource.Resource, err.PartialResource.Version), DefaultErrorExitCode) + default: + handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q", prefix, err.PartialResource.Resource), DefaultErrorExitCode) + } + case utilerrors.Aggregate: + handleErr(MultipleErrors(prefix, err.Errors()), DefaultErrorExitCode) + default: // for any other error type + msg, ok := StandardErrorMessage(err) + if !ok { + msg = err.Error() + if !strings.HasPrefix(msg, "error: ") { + msg = fmt.Sprintf("error: %s", msg) + } + } + handleErr(msg, DefaultErrorExitCode) } } - handleErr(fmt.Sprintf("%s%s", pref, msg)) } func statusCausesToAggrError(scs []unversioned.StatusCause) utilerrors.Aggregate { diff --git a/pkg/kubectl/cmd/util/helpers_test.go b/pkg/kubectl/cmd/util/helpers_test.go index c02d24293c0..0ba5b746935 100644 --- a/pkg/kubectl/cmd/util/helpers_test.go +++ b/pkg/kubectl/cmd/util/helpers_test.go @@ -213,91 +213,78 @@ func (f *fileHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) { res.Write(f.data) } +type checkErrTestCase struct { + err error + expectedErr string + expectedCode int +} + func TestCheckInvalidErr(t *testing.T) { - tests := []struct { - err error - expected string - }{ + testCheckError(t, []checkErrTestCase{ { errors.NewInvalid(api.Kind("Invalid1"), "invalidation", field.ErrorList{field.Invalid(field.NewPath("field"), "single", "details")}), - `error: The Invalid1 "invalidation" is invalid. field: Invalid value: "single": details`, + "The Invalid1 \"invalidation\" is invalid: field: Invalid value: \"single\": details\n", + DefaultErrorExitCode, }, { errors.NewInvalid(api.Kind("Invalid2"), "invalidation", field.ErrorList{field.Invalid(field.NewPath("field1"), "multi1", "details"), field.Invalid(field.NewPath("field2"), "multi2", "details")}), - `error: The Invalid2 "invalidation" is invalid. * field1: Invalid value: "multi1": details, * field2: Invalid value: "multi2": details`, + "The Invalid2 \"invalidation\" is invalid: \n* field1: Invalid value: \"multi1\": details\n* field2: Invalid value: \"multi2\": details\n", + DefaultErrorExitCode, }, { errors.NewInvalid(api.Kind("Invalid3"), "invalidation", field.ErrorList{}), - `error: The Invalid3 "invalidation" is invalid. %!s()`, + "The Invalid3 \"invalidation\" is invalid", + DefaultErrorExitCode, }, { errors.NewInvalid(api.Kind("Invalid4"), "invalidation", field.ErrorList{field.Invalid(field.NewPath("field4"), "multi4", "details"), field.Invalid(field.NewPath("field4"), "multi4", "details")}), - `error: The Invalid4 "invalidation" is invalid. field4: Invalid value: "multi4": details`, + "The Invalid4 \"invalidation\" is invalid: field4: Invalid value: \"multi4\": details\n", + DefaultErrorExitCode, }, - } - - var errReturned string - errHandle := func(err string) { - for _, v := range strings.Split(err, "\n") { - separator := " " - if errReturned == "" || v == "" { - separator = "" - } else if !strings.HasSuffix(errReturned, ".") { - separator = ", " - } - errReturned = fmt.Sprintf("%s%s%s", errReturned, separator, v) - } - if !strings.HasPrefix(errReturned, "error: ") { - errReturned = fmt.Sprintf("error: %s", errReturned) - } - if strings.HasSuffix(errReturned, ", ") { - errReturned = errReturned[:len(errReturned)-len(" ,")] - } - } - - for _, test := range tests { - checkErr("", test.err, errHandle) - - if errReturned != test.expected { - t.Fatalf("Got: %s, expected: %s", errReturned, test.expected) - } - errReturned = "" - } + }) } func TestCheckNoResourceMatchError(t *testing.T) { - tests := []struct { - err error - expected string - }{ + testCheckError(t, []checkErrTestCase{ { &meta.NoResourceMatchError{PartialResource: unversioned.GroupVersionResource{Resource: "foo"}}, `the server doesn't have a resource type "foo"`, + DefaultErrorExitCode, }, { &meta.NoResourceMatchError{PartialResource: unversioned.GroupVersionResource{Version: "theversion", Resource: "foo"}}, `the server doesn't have a resource type "foo" in version "theversion"`, + DefaultErrorExitCode, }, { &meta.NoResourceMatchError{PartialResource: unversioned.GroupVersionResource{Group: "thegroup", Version: "theversion", Resource: "foo"}}, `the server doesn't have a resource type "foo" in group "thegroup" and version "theversion"`, + DefaultErrorExitCode, }, { &meta.NoResourceMatchError{PartialResource: unversioned.GroupVersionResource{Group: "thegroup", Resource: "foo"}}, `the server doesn't have a resource type "foo" in group "thegroup"`, + DefaultErrorExitCode, }, - } + }) +} +func testCheckError(t *testing.T, tests []checkErrTestCase) { var errReturned string - errHandle := func(err string) { + var codeReturned int + errHandle := func(err string, code int) { errReturned = err + codeReturned = code } for _, test := range tests { checkErr("", test.err, errHandle) - if errReturned != test.expected { - t.Fatalf("Got: %s, expected: %s", errReturned, test.expected) + if errReturned != test.expectedErr { + t.Fatalf("Got: %s, expected: %s", errReturned, test.expectedErr) + } + if codeReturned != test.expectedCode { + t.Fatalf("Got: %d, expected: %d", codeReturned, test.expectedCode) } } } diff --git a/pkg/util/exec/exec.go b/pkg/util/exec/exec.go index e8768455103..e1ccba0972e 100644 --- a/pkg/util/exec/exec.go +++ b/pkg/util/exec/exec.go @@ -113,7 +113,7 @@ func (cmd *cmdWrapper) Output() ([]byte, error) { func handleError(err error) error { if ee, ok := err.(*osexec.ExitError); ok { // Force a compile fail if exitErrorWrapper can't convert to ExitError. - var x ExitError = &exitErrorWrapper{ee} + var x ExitError = &ExitErrorWrapper{ee} return x } if ee, ok := err.(*osexec.Error); ok { @@ -124,14 +124,16 @@ func handleError(err error) error { return err } -// exitErrorWrapper is an implementation of ExitError in terms of os/exec ExitError. +// ExitErrorWrapper is an implementation of ExitError in terms of os/exec ExitError. // Note: standard exec.ExitError is type *os.ProcessState, which already implements Exited(). -type exitErrorWrapper struct { +type ExitErrorWrapper struct { *osexec.ExitError } +var _ ExitError = ExitErrorWrapper{} + // ExitStatus is part of the ExitError interface. -func (eew exitErrorWrapper) ExitStatus() int { +func (eew ExitErrorWrapper) ExitStatus() int { ws, ok := eew.Sys().(syscall.WaitStatus) if !ok { panic("can't call ExitStatus() on a non-WaitStatus exitErrorWrapper") From e792d4117d8735784f4d830ad1c9d4f68cfee145 Mon Sep 17 00:00:00 2001 From: bindata-mockuser Date: Mon, 8 Aug 2016 20:25:10 +0200 Subject: [PATCH 3/3] Add return code support to kubectl-exec and -run --- .../unversioned/remotecommand/errorstream.go | 55 ++++++ .../remotecommand/remotecommand.go | 2 + pkg/client/unversioned/remotecommand/v2.go | 30 +--- .../unversioned/remotecommand/v2_test.go | 2 +- pkg/client/unversioned/remotecommand/v3.go | 6 +- pkg/client/unversioned/remotecommand/v4.go | 119 +++++++++++++ .../unversioned/remotecommand/v4_test.go | 71 ++++++++ pkg/kubectl/cmd/run.go | 166 ++++++++++++++---- pkg/kubectl/cmd/util/helpers.go | 4 + pkg/kubectl/cmd/util/helpers_test.go | 11 ++ pkg/kubelet/dockertools/exec.go | 10 +- pkg/kubelet/server/remotecommand/attach.go | 13 +- pkg/kubelet/server/remotecommand/constants.go | 7 +- pkg/kubelet/server/remotecommand/exec.go | 36 +++- .../server/remotecommand/httpstream.go | 88 +++++++++- pkg/kubelet/server/remotecommand/websocket.go | 44 ++++- pkg/util/exec/exec.go | 25 +++ test/e2e/framework/util.go | 16 +- test/e2e/kubectl.go | 44 +++++ 19 files changed, 666 insertions(+), 83 deletions(-) create mode 100644 pkg/client/unversioned/remotecommand/errorstream.go create mode 100644 pkg/client/unversioned/remotecommand/v4.go create mode 100644 pkg/client/unversioned/remotecommand/v4_test.go diff --git a/pkg/client/unversioned/remotecommand/errorstream.go b/pkg/client/unversioned/remotecommand/errorstream.go new file mode 100644 index 00000000000..9d32a54992e --- /dev/null +++ b/pkg/client/unversioned/remotecommand/errorstream.go @@ -0,0 +1,55 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remotecommand + +import ( + "fmt" + "io" + "io/ioutil" + + "k8s.io/kubernetes/pkg/util/runtime" +) + +// errorStreamDecoder interprets the data on the error channel and creates a go error object from it. +type errorStreamDecoder interface { + decode(message []byte) error +} + +// watchErrorStream watches the errorStream for remote command error data, +// decodes it with the given errorStreamDecoder, sends the decoded error (or nil if the remote +// command exited successfully) to the returned error channel, and closes it. +// This function returns immediately. +func watchErrorStream(errorStream io.Reader, d errorStreamDecoder) chan error { + errorChan := make(chan error) + + go func() { + defer runtime.HandleCrash() + + message, err := ioutil.ReadAll(errorStream) + switch { + case err != nil && err != io.EOF: + errorChan <- fmt.Errorf("error reading from error stream: %s", err) + case len(message) > 0: + errorChan <- d.decode(message) + default: + errorChan <- nil + } + close(errorChan) + }() + + return errorChan +} diff --git a/pkg/client/unversioned/remotecommand/remotecommand.go b/pkg/client/unversioned/remotecommand/remotecommand.go index ff18c4e763d..8faa87b9f1e 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand.go +++ b/pkg/client/unversioned/remotecommand/remotecommand.go @@ -162,6 +162,8 @@ func (e *streamExecutor) Stream(options StreamOptions) error { var streamer streamProtocolHandler switch protocol { + case remotecommand.StreamProtocolV4Name: + streamer = newStreamProtocolV4(options) case remotecommand.StreamProtocolV3Name: streamer = newStreamProtocolV3(options) case remotecommand.StreamProtocolV2Name: diff --git a/pkg/client/unversioned/remotecommand/v2.go b/pkg/client/unversioned/remotecommand/v2.go index bcba33bdf7b..ce2249080a0 100644 --- a/pkg/client/unversioned/remotecommand/v2.go +++ b/pkg/client/unversioned/remotecommand/v2.go @@ -88,27 +88,6 @@ func (p *streamProtocolV2) createStreams(conn streamCreator) error { return nil } -func (p *streamProtocolV2) setupErrorStreamReading() chan error { - errorChan := make(chan error) - - go func() { - defer runtime.HandleCrash() - - message, err := ioutil.ReadAll(p.errorStream) - switch { - case err != nil && err != io.EOF: - errorChan <- fmt.Errorf("error reading from error stream: %s", err) - case len(message) > 0: - errorChan <- fmt.Errorf("error executing remote command: %s", message) - default: - errorChan <- nil - } - close(errorChan) - }() - - return errorChan -} - func (p *streamProtocolV2) copyStdin() { if p.Stdin != nil { var once sync.Once @@ -193,7 +172,7 @@ func (p *streamProtocolV2) stream(conn streamCreator) error { // now that all the streams have been created, proceed with reading & copying - errorChan := p.setupErrorStreamReading() + errorChan := watchErrorStream(p.errorStream, &errorDecoderV2{}) p.copyStdin() @@ -207,3 +186,10 @@ func (p *streamProtocolV2) stream(conn streamCreator) error { // waits for errorStream to finish reading with an error or nil return <-errorChan } + +// errorDecoderV2 interprets the error channel data as plain text. +type errorDecoderV2 struct{} + +func (d *errorDecoderV2) decode(message []byte) error { + return fmt.Errorf("error executing remote command: %s", message) +} diff --git a/pkg/client/unversioned/remotecommand/v2_test.go b/pkg/client/unversioned/remotecommand/v2_test.go index d5e988b1ec2..5b5fad4b004 100644 --- a/pkg/client/unversioned/remotecommand/v2_test.go +++ b/pkg/client/unversioned/remotecommand/v2_test.go @@ -199,7 +199,7 @@ func TestV2ErrorStreamReading(t *testing.T) { h := newStreamProtocolV2(StreamOptions{}).(*streamProtocolV2) h.errorStream = test.stream - ch := h.setupErrorStreamReading() + ch := watchErrorStream(h.errorStream, &errorDecoderV2{}) if ch == nil { t.Fatalf("%s: unexpected nil channel", test.name) } diff --git a/pkg/client/unversioned/remotecommand/v3.go b/pkg/client/unversioned/remotecommand/v3.go index e2574a93aa9..0716d2b0736 100644 --- a/pkg/client/unversioned/remotecommand/v3.go +++ b/pkg/client/unversioned/remotecommand/v3.go @@ -90,7 +90,7 @@ func (p *streamProtocolV3) stream(conn streamCreator) error { // now that all the streams have been created, proceed with reading & copying - errorChan := p.setupErrorStreamReading() + errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{}) p.handleResizes() @@ -106,3 +106,7 @@ func (p *streamProtocolV3) stream(conn streamCreator) error { // waits for errorStream to finish reading with an error or nil return <-errorChan } + +type errorDecoderV3 struct { + errorDecoderV2 +} diff --git a/pkg/client/unversioned/remotecommand/v4.go b/pkg/client/unversioned/remotecommand/v4.go new file mode 100644 index 00000000000..5b8bb3ba55d --- /dev/null +++ b/pkg/client/unversioned/remotecommand/v4.go @@ -0,0 +1,119 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remotecommand + +import ( + "encoding/json" + "errors" + "fmt" + "strconv" + "sync" + + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" + "k8s.io/kubernetes/pkg/util/exec" +) + +// streamProtocolV4 implements version 4 of the streaming protocol for attach +// and exec. This version adds support for exit codes on the error stream through +// the use of unversioned.Status instead of plain text messages. +type streamProtocolV4 struct { + *streamProtocolV3 +} + +var _ streamProtocolHandler = &streamProtocolV4{} + +func newStreamProtocolV4(options StreamOptions) streamProtocolHandler { + return &streamProtocolV4{ + streamProtocolV3: newStreamProtocolV3(options).(*streamProtocolV3), + } +} + +func (p *streamProtocolV4) createStreams(conn streamCreator) error { + return p.streamProtocolV3.createStreams(conn) +} + +func (p *streamProtocolV4) handleResizes() { + p.streamProtocolV3.handleResizes() +} + +func (p *streamProtocolV4) stream(conn streamCreator) error { + if err := p.createStreams(conn); err != nil { + return err + } + + // now that all the streams have been created, proceed with reading & copying + + errorChan := watchErrorStream(p.errorStream, &errorDecoderV4{}) + + p.handleResizes() + + p.copyStdin() + + var wg sync.WaitGroup + p.copyStdout(&wg) + p.copyStderr(&wg) + + // we're waiting for stdout/stderr to finish copying + wg.Wait() + + // waits for errorStream to finish reading with an error or nil + return <-errorChan +} + +// errorDecoderV4 interprets the json-marshaled unversioned.Status on the error channel +// and creates an exec.ExitError from it. +type errorDecoderV4 struct{} + +func (d *errorDecoderV4) decode(message []byte) error { + status := unversioned.Status{} + err := json.Unmarshal(message, &status) + if err != nil { + return fmt.Errorf("error stream protocol error: %v in %q", err, string(message)) + } + switch status.Status { + case unversioned.StatusSuccess: + return nil + case unversioned.StatusFailure: + if status.Reason == remotecommand.NonZeroExitCodeReason { + if status.Details == nil { + return errors.New("error stream protocol error: details must be set") + } + for i := range status.Details.Causes { + c := &status.Details.Causes[i] + if c.Type != remotecommand.ExitCodeCauseType { + continue + } + + rc, err := strconv.ParseUint(c.Message, 10, 8) + if err != nil { + return fmt.Errorf("error stream protocol error: invalid exit code value %q", c.Message) + } + return exec.CodeExitError{ + Err: fmt.Errorf("command terminated with exit code %d", rc), + Code: int(rc), + } + } + + return fmt.Errorf("error stream protocol error: no %s cause given", remotecommand.ExitCodeCauseType) + } + default: + return errors.New("error stream protocol error: unknown error") + } + + return fmt.Errorf(status.Message) +} diff --git a/pkg/client/unversioned/remotecommand/v4_test.go b/pkg/client/unversioned/remotecommand/v4_test.go new file mode 100644 index 00000000000..b8674918f2c --- /dev/null +++ b/pkg/client/unversioned/remotecommand/v4_test.go @@ -0,0 +1,71 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remotecommand + +import ( + "fmt" + "testing" +) + +func TestV4ErrorDecoder(t *testing.T) { + dec := errorDecoderV4{} + + type Test struct { + message string + err string + } + + for _, test := range []Test{ + { + message: "{}", + err: "error stream protocol error: unknown error", + }, + { + message: "{", + err: "error stream protocol error: unexpected end of JSON input in \"{\"", + }, + { + message: `{"status": "Success" }`, + err: "", + }, + { + message: `{"status": "Failure", "message": "foobar" }`, + err: "foobar", + }, + { + message: `{"status": "Failure", "message": "foobar", "reason": "NonZeroExitCode", "details": {"causes": [{"reason": "foo"}] } }`, + err: "error stream protocol error: no ExitCode cause given", + }, + { + message: `{"status": "Failure", "message": "foobar", "reason": "NonZeroExitCode", "details": {"causes": [{"reason": "ExitCode"}] } }`, + err: "error stream protocol error: invalid exit code value \"\"", + }, + { + message: `{"status": "Failure", "message": "foobar", "reason": "NonZeroExitCode", "details": {"causes": [{"reason": "ExitCode", "message": "42"}] } }`, + err: "command terminated with exit code 42", + }, + } { + err := dec.decode([]byte(test.message)) + want := test.err + if want == "" { + want = "" + } + if got := fmt.Sprintf("%v", err); got != want { + t.Errorf("wrong error for message %q: want=%q, got=%q", test.message, want, got) + } + } +} diff --git a/pkg/kubectl/cmd/run.go b/pkg/kubectl/cmd/run.go index d9af5eaac31..43b897a5554 100644 --- a/pkg/kubectl/cmd/run.go +++ b/pkg/kubectl/cmd/run.go @@ -37,6 +37,8 @@ import ( cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/runtime" + uexec "k8s.io/kubernetes/pkg/util/exec" + "k8s.io/kubernetes/pkg/watch" ) var ( @@ -114,7 +116,7 @@ func addRunFlags(cmd *cobra.Command) { cmd.Flags().StringP("labels", "l", "", "Labels to apply to the pod(s).") cmd.Flags().BoolP("stdin", "i", false, "Keep stdin open on the container(s) in the pod, even if nothing is attached.") cmd.Flags().BoolP("tty", "t", false, "Allocated a TTY for each container in the pod.") - cmd.Flags().Bool("attach", false, "If true, wait for the Pod to start running, and then attach to the Pod as if 'kubectl attach ...' were called. Default false, unless '-i/--stdin' is set, in which case the default is true.") + cmd.Flags().Bool("attach", false, "If true, wait for the Pod to start running, and then attach to the Pod as if 'kubectl attach ...' were called. Default false, unless '-i/--stdin' is set, in which case the default is true. With '--restart=Never' the exit code of the container process is returned.") cmd.Flags().Bool("leave-stdin-open", false, "If the pod is started in interactive mode or with stdin, leave stdin open after the first attach completes. By default, stdin will be closed after the first attach completes.") cmd.Flags().String("restart", "Always", "The restart policy for this Pod. Legal values [Always, OnFailure, Never]. If set to 'Always' a deployment is created for this pod, if set to 'OnFailure', a job is created for this pod, if set to 'Never', a regular pod is created. For the latter two --replicas must be 1. Default 'Always'") cmd.Flags().Bool("command", false, "If true and extra arguments are present, use them as the 'command' field in the container, rather than the 'args' field which is the default.") @@ -128,7 +130,6 @@ func addRunFlags(cmd *cobra.Command) { } func Run(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cobra.Command, args []string, argsLenAtDash int) error { - quiet := cmdutil.GetFlagBool(cmd, "quiet") if len(os.Args) > 1 && os.Args[1] == "run-container" { printDeprecationWarning("run", "run-container") } @@ -243,6 +244,7 @@ func Run(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cob } if attach { + quiet := cmdutil.GetFlagBool(cmd, "quiet") opts := &AttachOptions{ StreamOptions: StreamOptions{ In: cmdIn, @@ -273,11 +275,21 @@ func Run(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cob if err != nil { return err } - err = handleAttachPod(f, client, attachablePod, opts, quiet) + err = handleAttachPod(f, client, attachablePod.Namespace, attachablePod.Name, opts, quiet) if err != nil { return err } + var pod *api.Pod + leaveStdinOpen := cmdutil.GetFlagBool(cmd, "leave-stdin-open") + waitForExitCode := !leaveStdinOpen && restartPolicy == api.RestartPolicyNever + if waitForExitCode { + pod, err = waitForPodTerminated(client, attachablePod.Namespace, attachablePod.Name, opts.Out, quiet) + if err != nil { + return err + } + } + if remove { namespace, err = mapping.MetadataAccessor.Namespace(obj) if err != nil { @@ -295,9 +307,37 @@ func Run(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cob ResourceNames(mapping.Resource, name). Flatten(). Do() - return ReapResult(r, f, cmdOut, true, true, 0, -1, false, mapper, quiet) + err = ReapResult(r, f, cmdOut, true, true, 0, -1, false, mapper, quiet) + if err != nil { + return err + } + } + + // after removal is done, return successfully if we are not interested in the exit code + if !waitForExitCode { + return nil + } + + switch pod.Status.Phase { + case api.PodSucceeded: + return nil + case api.PodFailed: + unknownRcErr := fmt.Errorf("pod %s/%s failed with unknown exit code", pod.Namespace, pod.Name) + if len(pod.Status.ContainerStatuses) == 0 || pod.Status.ContainerStatuses[0].State.Terminated == nil { + return unknownRcErr + } + // assume here that we have at most one status because kubectl-run only creates one container per pod + rc := pod.Status.ContainerStatuses[0].State.Terminated.ExitCode + if rc == 0 { + return unknownRcErr + } + return uexec.CodeExitError{ + Err: fmt.Errorf("pod %s/%s terminated", pod.Namespace, pod.Name), + Code: int(rc), + } + default: + return fmt.Errorf("pod %s/%s left in phase %s", pod.Namespace, pod.Name, pod.Status.Phase) } - return nil } outputFormat := cmdutil.GetFlagString(cmd, "output") @@ -325,37 +365,91 @@ func contains(resourcesList map[string]*unversioned.APIResourceList, resource un return false } -func waitForPodRunning(c *client.Client, pod *api.Pod, out io.Writer, quiet bool) (status api.PodPhase, err error) { - for { - pod, err := c.Pods(pod.Namespace).Get(pod.Name) - if err != nil { - return api.PodUnknown, err - } - ready := false - if pod.Status.Phase == api.PodRunning { - ready = true - for _, status := range pod.Status.ContainerStatuses { - if !status.Ready { - ready = false - break - } - } - if ready { - return api.PodRunning, nil - } - } - if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed { - return pod.Status.Phase, nil - } - if !quiet { - fmt.Fprintf(out, "Waiting for pod %s/%s to be running, status is %s, pod ready: %v\n", pod.Namespace, pod.Name, pod.Status.Phase, ready) - } - time.Sleep(2 * time.Second) +// waitForPod watches the given pod until the exitCondition is true. Each two seconds +// the tick function is called e.g. for progress output. +func waitForPod(c *client.Client, ns, name string, exitCondition func(*api.Pod) bool, tick func(*api.Pod)) (*api.Pod, error) { + pod, err := c.Pods(ns).Get(name) + if err != nil { + return nil, err } + if exitCondition(pod) { + return pod, nil + } + + tick(pod) + + w, err := c.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: pod.Name, ResourceVersion: pod.ResourceVersion})) + if err != nil { + return nil, err + } + + t := time.NewTicker(2 * time.Second) + defer t.Stop() + go func() { + for range t.C { + tick(pod) + } + }() + + err = nil + result := pod + kubectl.WatchLoop(w, func(ev watch.Event) error { + switch ev.Type { + case watch.Added, watch.Modified: + pod = ev.Object.(*api.Pod) + if exitCondition(pod) { + result = pod + w.Stop() + } + case watch.Deleted: + w.Stop() + case watch.Error: + result = nil + err = fmt.Errorf("failed to watch pod %s/%s", ns, name) + w.Stop() + } + return nil + }) + + return result, err } -func handleAttachPod(f *cmdutil.Factory, c *client.Client, pod *api.Pod, opts *AttachOptions, quiet bool) error { - status, err := waitForPodRunning(c, pod, opts.Out, quiet) +func waitForPodRunning(c *client.Client, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) { + exitCondition := func(pod *api.Pod) bool { + switch pod.Status.Phase { + case api.PodRunning: + for _, status := range pod.Status.ContainerStatuses { + if !status.Ready { + return false + } + } + return true + case api.PodSucceeded, api.PodFailed: + return true + default: + return false + } + } + return waitForPod(c, ns, name, exitCondition, func(pod *api.Pod) { + if !quiet { + fmt.Fprintf(out, "Waiting for pod %s/%s to be running, status is %s, pod ready: false\n", pod.Namespace, pod.Name, pod.Status.Phase) + } + }) +} + +func waitForPodTerminated(c *client.Client, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) { + exitCondition := func(pod *api.Pod) bool { + return pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed + } + return waitForPod(c, ns, name, exitCondition, func(pod *api.Pod) { + if !quiet { + fmt.Fprintf(out, "Waiting for pod %s/%s to terminate, status is %s\n", pod.Namespace, pod.Name, pod.Status.Phase) + } + }) +} + +func handleAttachPod(f *cmdutil.Factory, c *client.Client, ns, name string, opts *AttachOptions, quiet bool) error { + pod, err := waitForPodRunning(c, ns, name, opts.Out, quiet) if err != nil { return err } @@ -363,7 +457,7 @@ func handleAttachPod(f *cmdutil.Factory, c *client.Client, pod *api.Pod, opts *A if err != nil { return err } - if status == api.PodSucceeded || status == api.PodFailed { + if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed { req, err := f.LogsForObject(pod, &api.PodLogOptions{Container: ctrName}) if err != nil { return err @@ -377,8 +471,8 @@ func handleAttachPod(f *cmdutil.Factory, c *client.Client, pod *api.Pod, opts *A return err } opts.Client = c - opts.PodName = pod.Name - opts.Namespace = pod.Namespace + opts.PodName = name + opts.Namespace = ns if err := opts.Run(); err != nil { fmt.Fprintf(opts.Out, "Error attaching, falling back to logs: %v\n", err) req, err := f.LogsForObject(pod, &api.PodLogOptions{Container: ctrName}) diff --git a/pkg/kubectl/cmd/util/helpers.go b/pkg/kubectl/cmd/util/helpers.go index 7208e2f98ae..dac0610e7ad 100644 --- a/pkg/kubectl/cmd/util/helpers.go +++ b/pkg/kubectl/cmd/util/helpers.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/runtime" utilerrors "k8s.io/kubernetes/pkg/util/errors" + utilexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/strategicpatch" @@ -150,6 +151,9 @@ func checkErr(prefix string, err error, handleErr func(string, int)) { } case utilerrors.Aggregate: handleErr(MultipleErrors(prefix, err.Errors()), DefaultErrorExitCode) + case utilexec.ExitError: + // do not print anything, only terminate with given error + handleErr("", err.ExitStatus()) default: // for any other error type msg, ok := StandardErrorMessage(err) if !ok { diff --git a/pkg/kubectl/cmd/util/helpers_test.go b/pkg/kubectl/cmd/util/helpers_test.go index 0ba5b746935..43a2bffd4fe 100644 --- a/pkg/kubectl/cmd/util/helpers_test.go +++ b/pkg/kubectl/cmd/util/helpers_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/runtime" + uexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/validation/field" ) @@ -269,6 +270,16 @@ func TestCheckNoResourceMatchError(t *testing.T) { }) } +func TestCheckExitError(t *testing.T) { + testCheckError(t, []checkErrTestCase{ + { + uexec.CodeExitError{Err: fmt.Errorf("pod foo/bar terminated"), Code: 42}, + "", + 42, + }, + }) +} + func testCheckError(t *testing.T, tests []checkErrTestCase) { var errReturned string var codeReturned int diff --git a/pkg/kubelet/dockertools/exec.go b/pkg/kubelet/dockertools/exec.go index ce213650b06..03dcbe34ff6 100644 --- a/pkg/kubelet/dockertools/exec.go +++ b/pkg/kubelet/dockertools/exec.go @@ -26,6 +26,7 @@ import ( dockertypes "github.com/docker/engine-api/types" "github.com/golang/glog" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + utilexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/term" ) @@ -74,7 +75,7 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do go io.Copy(stdout, p) } - return command.Wait() + err = command.Wait() } else { if stdin != nil { // Use an os.Pipe here as it returns true *os.File objects. @@ -96,8 +97,13 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do command.Stderr = stderr } - return command.Run() + err = command.Run() } + + if exitErr, ok := err.(*exec.ExitError); ok { + return &utilexec.ExitErrorWrapper{ExitError: exitErr} + } + return err } // NativeExecHandler executes commands in Docker containers using Docker's exec API. diff --git a/pkg/kubelet/server/remotecommand/attach.go b/pkg/kubelet/server/remotecommand/attach.go index 372ae54a210..607f7338500 100644 --- a/pkg/kubelet/server/remotecommand/attach.go +++ b/pkg/kubelet/server/remotecommand/attach.go @@ -17,12 +17,13 @@ limitations under the License. package remotecommand import ( - "errors" "fmt" "io" "net/http" "time" + apierrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/term" @@ -47,8 +48,12 @@ func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, po err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan) if err != nil { - msg := fmt.Sprintf("error attaching to container: %v", err) - runtime.HandleError(errors.New(msg)) - fmt.Fprint(ctx.errorStream, msg) + err = fmt.Errorf("error attaching to container: %v", err) + runtime.HandleError(err) + ctx.writeStatus(apierrors.NewInternalError(err)) + } else { + ctx.writeStatus(&apierrors.StatusError{ErrStatus: unversioned.Status{ + Status: unversioned.StatusSuccess, + }}) } } diff --git a/pkg/kubelet/server/remotecommand/constants.go b/pkg/kubelet/server/remotecommand/constants.go index 33e0dce29a4..ebc602b6262 100644 --- a/pkg/kubelet/server/remotecommand/constants.go +++ b/pkg/kubelet/server/remotecommand/constants.go @@ -36,6 +36,11 @@ const ( // attachment/execution. It is the third version of the subprotocol and // adds support for resizing container terminals. StreamProtocolV3Name = "v3.channel.k8s.io" + + // The SPDY subprotocol "v4.channel.k8s.io" is used for remote command + // attachment/execution. It is the 4th version of the subprotocol and + // adds support for exit codes. + StreamProtocolV4Name = "v4.channel.k8s.io" ) -var SupportedStreamingProtocols = []string{StreamProtocolV3Name, StreamProtocolV2Name, StreamProtocolV1Name} +var SupportedStreamingProtocols = []string{StreamProtocolV4Name, StreamProtocolV3Name, StreamProtocolV2Name, StreamProtocolV1Name} diff --git a/pkg/kubelet/server/remotecommand/exec.go b/pkg/kubelet/server/remotecommand/exec.go index fcc4c230bf9..3b82d8de832 100644 --- a/pkg/kubelet/server/remotecommand/exec.go +++ b/pkg/kubelet/server/remotecommand/exec.go @@ -17,18 +17,25 @@ limitations under the License. package remotecommand import ( - "errors" "fmt" "io" "net/http" "time" "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/types" + utilexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/term" ) +const ( + NonZeroExitCodeReason = unversioned.StatusReason("NonZeroExitCode") + ExitCodeCauseType = unversioned.CauseType("ExitCode") +) + // Executor knows how to execute a command in a container in a pod. type Executor interface { // ExecInContainer executes a command in a container in the pod, copying data @@ -51,8 +58,29 @@ func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podN err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan) if err != nil { - msg := fmt.Sprintf("error executing command in container: %v", err) - runtime.HandleError(errors.New(msg)) - fmt.Fprint(ctx.errorStream, msg) + if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() { + rc := exitErr.ExitStatus() + ctx.writeStatus(&apierrors.StatusError{ErrStatus: unversioned.Status{ + Status: unversioned.StatusFailure, + Reason: NonZeroExitCodeReason, + Details: &unversioned.StatusDetails{ + Causes: []unversioned.StatusCause{ + { + Type: ExitCodeCauseType, + Message: fmt.Sprintf("%d", rc), + }, + }, + }, + Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr), + }}) + } else { + err = fmt.Errorf("error executing command in container: %v", err) + runtime.HandleError(err) + ctx.writeStatus(apierrors.NewInternalError(err)) + } + } else { + ctx.writeStatus(&apierrors.StatusError{ErrStatus: unversioned.Status{ + Status: unversioned.StatusSuccess, + }}) } } diff --git a/pkg/kubelet/server/remotecommand/httpstream.go b/pkg/kubelet/server/remotecommand/httpstream.go index 83db923960c..141a8836cf8 100644 --- a/pkg/kubelet/server/remotecommand/httpstream.go +++ b/pkg/kubelet/server/remotecommand/httpstream.go @@ -25,6 +25,8 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream/spdy" "k8s.io/kubernetes/pkg/util/runtime" @@ -88,7 +90,7 @@ type context struct { stdinStream io.ReadCloser stdoutStream io.WriteCloser stderrStream io.WriteCloser - errorStream io.WriteCloser + writeStatus func(status *apierrors.StatusError) error resizeStream io.ReadCloser resizeChan chan term.Size tty bool @@ -168,6 +170,8 @@ func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *opt var handler protocolHandler switch protocol { + case StreamProtocolV4Name: + handler = &v4ProtocolHandler{} case StreamProtocolV3Name: handler = &v3ProtocolHandler{} case StreamProtocolV2Name: @@ -206,6 +210,59 @@ type protocolHandler interface { supportsTerminalResizing() bool } +// v4ProtocolHandler implements the V4 protocol version for streaming command execution. It only differs +// in from v3 in the error stream format using an json-marshaled unversioned.Status which carries +// the process' exit code. +type v4ProtocolHandler struct{} + +func (*v4ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) { + ctx := &context{} + receivedStreams := 0 + replyChan := make(chan struct{}) + stop := make(chan struct{}) + defer close(stop) +WaitForStreams: + for { + select { + case stream := <-streams: + streamType := stream.Headers().Get(api.StreamType) + switch streamType { + case api.StreamTypeError: + ctx.writeStatus = v4WriteStatusFunc(stream) // write json errors + go waitStreamReply(stream.replySent, replyChan, stop) + case api.StreamTypeStdin: + ctx.stdinStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case api.StreamTypeStdout: + ctx.stdoutStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case api.StreamTypeStderr: + ctx.stderrStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case api.StreamTypeResize: + ctx.resizeStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + default: + runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType)) + } + case <-replyChan: + receivedStreams++ + if receivedStreams == expectedStreams { + break WaitForStreams + } + case <-expired: + // TODO find a way to return the error to the user. Maybe use a separate + // stream to report errors? + return nil, errors.New("timed out waiting for client to create streams") + } + } + + return ctx, nil +} + +// supportsTerminalResizing returns true because v4ProtocolHandler supports it +func (*v4ProtocolHandler) supportsTerminalResizing() bool { return true } + // v3ProtocolHandler implements the V3 protocol version for streaming command execution. type v3ProtocolHandler struct{} @@ -222,7 +279,7 @@ WaitForStreams: streamType := stream.Headers().Get(api.StreamType) switch streamType { case api.StreamTypeError: - ctx.errorStream = stream + ctx.writeStatus = v1WriteStatusFunc(stream) go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeStdin: ctx.stdinStream = stream @@ -273,7 +330,7 @@ WaitForStreams: streamType := stream.Headers().Get(api.StreamType) switch streamType { case api.StreamTypeError: - ctx.errorStream = stream + ctx.writeStatus = v1WriteStatusFunc(stream) go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeStdin: ctx.stdinStream = stream @@ -321,7 +378,7 @@ WaitForStreams: streamType := stream.Headers().Get(api.StreamType) switch streamType { case api.StreamTypeError: - ctx.errorStream = stream + ctx.writeStatus = v1WriteStatusFunc(stream) // This defer statement shouldn't be here, but due to previous refactoring, it ended up in // here. This is what 1.0.x kubelets do, so we're retaining that behavior. This is fixed in @@ -375,3 +432,26 @@ func handleResizeEvents(stream io.Reader, channel chan<- term.Size) { channel <- size } } + +func v1WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error { + return func(status *apierrors.StatusError) error { + if status.Status().Status == unversioned.StatusSuccess { + return nil // send error messages + } + _, err := stream.Write([]byte(status.Error())) + return err + } +} + +// v4WriteStatusFunc returns a WriteStatusFunc that marshals a given api Status +// as json in the error channel. +func v4WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error { + return func(status *apierrors.StatusError) error { + bs, err := json.Marshal(status.Status()) + if err != nil { + return err + } + _, err = stream.Write(bs) + return err + } +} diff --git a/pkg/kubelet/server/remotecommand/websocket.go b/pkg/kubelet/server/remotecommand/websocket.go index efeb1b6646e..243f6dfc0b0 100644 --- a/pkg/kubelet/server/remotecommand/websocket.go +++ b/pkg/kubelet/server/remotecommand/websocket.go @@ -32,6 +32,11 @@ const ( stderrChannel errorChannel resizeChannel + + preV4BinaryWebsocketProtocol = wsstream.ChannelWebSocketProtocol + preV4Base64WebsocketProtocol = wsstream.Base64ChannelWebSocketProtocol + v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol + v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol ) // createChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2) @@ -67,9 +72,30 @@ func writeChannel(real bool) wsstream.ChannelType { // streams needed to perform an exec or an attach. func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *options, idleTimeout time.Duration) (*context, bool) { channels := createChannels(opts) - conn := wsstream.NewConn(channels...) + conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{ + "": { + Binary: true, + Channels: channels, + }, + preV4BinaryWebsocketProtocol: { + Binary: true, + Channels: channels, + }, + preV4Base64WebsocketProtocol: { + Binary: false, + Channels: channels, + }, + v4BinaryWebsocketProtocol: { + Binary: true, + Channels: channels, + }, + v4Base64WebsocketProtocol: { + Binary: false, + Channels: channels, + }, + }) conn.SetIdleTimeout(idleTimeout) - streams, err := conn.Open(httplog.Unlogged(w), req) + negotiatedProtocol, streams, err := conn.Open(httplog.Unlogged(w), req) if err != nil { runtime.HandleError(fmt.Errorf("Unable to upgrade websocket connection: %v", err)) return nil, false @@ -86,13 +112,21 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *opti streams[errorChannel].Write([]byte{}) } - return &context{ + ctx := &context{ conn: conn, stdinStream: streams[stdinChannel], stdoutStream: streams[stdoutChannel], stderrStream: streams[stderrChannel], - errorStream: streams[errorChannel], tty: opts.tty, resizeStream: streams[resizeChannel], - }, true + } + + switch negotiatedProtocol { + case v4BinaryWebsocketProtocol, v4Base64WebsocketProtocol: + ctx.writeStatus = v4WriteStatusFunc(streams[errorChannel]) + default: + ctx.writeStatus = v1WriteStatusFunc(streams[errorChannel]) + } + + return ctx, true } diff --git a/pkg/util/exec/exec.go b/pkg/util/exec/exec.go index e1ccba0972e..1aeba036f4d 100644 --- a/pkg/util/exec/exec.go +++ b/pkg/util/exec/exec.go @@ -140,3 +140,28 @@ func (eew ExitErrorWrapper) ExitStatus() int { } return ws.ExitStatus() } + +// CodeExitError is an implementation of ExitError consisting of an error object +// and an exit code (the upper bits of os.exec.ExitStatus). +type CodeExitError struct { + Err error + Code int +} + +var _ ExitError = CodeExitError{} + +func (e CodeExitError) Error() string { + return e.Err.Error() +} + +func (e CodeExitError) String() string { + return e.Err.Error() +} + +func (e CodeExitError) Exited() bool { + return true +} + +func (e CodeExitError) ExitStatus() int { + return e.Code +} diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 73eb5c735aa..83c9dfed81e 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -36,6 +36,7 @@ import ( "strconv" "strings" "sync" + "syscall" "time" "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" @@ -62,6 +63,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" sshutil "k8s.io/kubernetes/pkg/ssh" "k8s.io/kubernetes/pkg/types" + uexec "k8s.io/kubernetes/pkg/util/exec" labelsutil "k8s.io/kubernetes/pkg/util/labels" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/system" @@ -1996,7 +1998,7 @@ func (b kubectlBuilder) Exec() (string, error) { Logf("Running '%s %s'", cmd.Path, strings.Join(cmd.Args[1:], " ")) // skip arg[0] as it is printed separately if err := cmd.Start(); err != nil { - return "", fmt.Errorf("Error starting %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err) + return "", fmt.Errorf("error starting %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err) } errCh := make(chan error, 1) go func() { @@ -2005,11 +2007,19 @@ func (b kubectlBuilder) Exec() (string, error) { select { case err := <-errCh: if err != nil { - return "", fmt.Errorf("Error running %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err) + var rc int = 127 + if ee, ok := err.(*exec.ExitError); ok { + Logf("rc: %d", rc) + rc = int(ee.Sys().(syscall.WaitStatus).ExitStatus()) + } + return "", uexec.CodeExitError{ + Err: fmt.Errorf("error running %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err), + Code: rc, + } } case <-b.timeout: b.cmd.Process.Kill() - return "", fmt.Errorf("Timed out waiting for command %v:\nCommand stdout:\n%v\nstderr:\n%v\n", cmd, cmd.Stdout, cmd.Stderr) + return "", fmt.Errorf("timed out waiting for command %v:\nCommand stdout:\n%v\nstderr:\n%v\n", cmd, cmd.Stdout, cmd.Stderr) } Logf("stderr: %q", stderr.String()) return stdout.String(), nil diff --git a/test/e2e/kubectl.go b/test/e2e/kubectl.go index c4bc0b23c31..0f8488e275f 100644 --- a/test/e2e/kubectl.go +++ b/test/e2e/kubectl.go @@ -51,6 +51,7 @@ import ( "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/registry/generic/registry" + uexec "k8s.io/kubernetes/pkg/util/exec" utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/pkg/util/wait" @@ -348,6 +349,49 @@ var _ = framework.KubeDescribe("Kubectl client", func() { } }) + It("should return command exit codes", func() { + nsFlag := fmt.Sprintf("--namespace=%v", ns) + + By("execing into a container with a successful command") + _, err := framework.NewKubectlCommand(nsFlag, "exec", "nginx", "--", "/bin/sh", "-c", "exit 0").Exec() + ExpectNoError(err) + + By("execing into a container with a failing command") + _, err = framework.NewKubectlCommand(nsFlag, "exec", "nginx", "--", "/bin/sh", "-c", "exit 42").Exec() + ee, ok := err.(uexec.ExitError) + Expect(ok).To(Equal(true)) + Expect(ee.ExitStatus()).To(Equal(42)) + + By("running a successful command") + _, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=Never", "success", "--", "/bin/sh", "-c", "exit 0").Exec() + ExpectNoError(err) + + By("running a failing command") + _, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=Never", "failure-1", "--", "/bin/sh", "-c", "exit 42").Exec() + ee, ok = err.(uexec.ExitError) + Expect(ok).To(Equal(true)) + Expect(ee.ExitStatus()).To(Equal(42)) + + By("running a failing command without --restart=Never") + _, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=OnFailure", "failure-2", "--", "/bin/sh", "-c", "cat && exit 42"). + WithStdinData("abcd1234"). + Exec() + ExpectNoError(err) + + By("running a failing command without --restart=Never, but with --rm") + _, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=OnFailure", "--rm", "failure-3", "--", "/bin/sh", "-c", "cat && exit 42"). + WithStdinData("abcd1234"). + Exec() + ExpectNoError(err) + framework.WaitForPodToDisappear(f.Client, ns, "failure-3", labels.Everything(), 2*time.Second, wait.ForeverTestTimeout) + + By("running a failing command with --leave-stdin-open") + _, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=Never", "failure-4", "--leave-stdin-open", "--", "/bin/sh", "-c", "exit 42"). + WithStdinData("abcd1234"). + Exec() + ExpectNoError(err) + }) + It("should support inline execution and attach", func() { framework.SkipIfContainerRuntimeIs("rkt") // #23335 framework.SkipUnlessServerVersionGTE(jobsVersion, c)