Merge pull request #26541 from sttts/sttts-kubectl-exec-rc

Automatic merge from submit-queue

Return container command exit codes in kubectl run/exec

Fixes https://github.com/kubernetes/kubernetes/issues/26424
Based on https://github.com/kubernetes/kubernetes/pull/25273.

TODO:
- [x] add e2e tests
- [x] investigate `kubectl run` exit code for `--restart=Never` (compare issue #24533 and PR #25253)
- [x] document exit codes
This commit is contained in:
Kubernetes Submit Queue 2016-08-21 00:44:55 -07:00 committed by GitHub
commit 7272cd09e3
28 changed files with 1065 additions and 270 deletions

View File

@ -451,7 +451,7 @@ func write(statusCode int, gv unversioned.GroupVersion, s runtime.NegotiatedSeri
defer out.Close() defer out.Close()
if wsstream.IsWebSocketRequest(req) { if wsstream.IsWebSocketRequest(req) {
r := wsstream.NewReader(out, true) r := wsstream.NewReader(out, true, wsstream.NewDefaultReaderProtocols())
if err := r.Copy(w, req); err != nil { if err := r.Copy(w, req); err != nil {
utilruntime.HandleError(fmt.Errorf("error encountered while streaming results via websocket: %v", err)) utilruntime.HandleError(fmt.Errorf("error encountered while streaming results via websocket: %v", err))
} }

View File

@ -0,0 +1,55 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"fmt"
"io"
"io/ioutil"
"k8s.io/kubernetes/pkg/util/runtime"
)
// errorStreamDecoder interprets the data on the error channel and creates a go error object from it.
type errorStreamDecoder interface {
decode(message []byte) error
}
// watchErrorStream watches the errorStream for remote command error data,
// decodes it with the given errorStreamDecoder, sends the decoded error (or nil if the remote
// command exited successfully) to the returned error channel, and closes it.
// This function returns immediately.
func watchErrorStream(errorStream io.Reader, d errorStreamDecoder) chan error {
errorChan := make(chan error)
go func() {
defer runtime.HandleCrash()
message, err := ioutil.ReadAll(errorStream)
switch {
case err != nil && err != io.EOF:
errorChan <- fmt.Errorf("error reading from error stream: %s", err)
case len(message) > 0:
errorChan <- d.decode(message)
default:
errorChan <- nil
}
close(errorChan)
}()
return errorChan
}

View File

@ -162,6 +162,8 @@ func (e *streamExecutor) Stream(options StreamOptions) error {
var streamer streamProtocolHandler var streamer streamProtocolHandler
switch protocol { switch protocol {
case remotecommand.StreamProtocolV4Name:
streamer = newStreamProtocolV4(options)
case remotecommand.StreamProtocolV3Name: case remotecommand.StreamProtocolV3Name:
streamer = newStreamProtocolV3(options) streamer = newStreamProtocolV3(options)
case remotecommand.StreamProtocolV2Name: case remotecommand.StreamProtocolV2Name:

View File

@ -88,27 +88,6 @@ func (p *streamProtocolV2) createStreams(conn streamCreator) error {
return nil return nil
} }
func (p *streamProtocolV2) setupErrorStreamReading() chan error {
errorChan := make(chan error)
go func() {
defer runtime.HandleCrash()
message, err := ioutil.ReadAll(p.errorStream)
switch {
case err != nil && err != io.EOF:
errorChan <- fmt.Errorf("error reading from error stream: %s", err)
case len(message) > 0:
errorChan <- fmt.Errorf("error executing remote command: %s", message)
default:
errorChan <- nil
}
close(errorChan)
}()
return errorChan
}
func (p *streamProtocolV2) copyStdin() { func (p *streamProtocolV2) copyStdin() {
if p.Stdin != nil { if p.Stdin != nil {
var once sync.Once var once sync.Once
@ -193,7 +172,7 @@ func (p *streamProtocolV2) stream(conn streamCreator) error {
// now that all the streams have been created, proceed with reading & copying // now that all the streams have been created, proceed with reading & copying
errorChan := p.setupErrorStreamReading() errorChan := watchErrorStream(p.errorStream, &errorDecoderV2{})
p.copyStdin() p.copyStdin()
@ -207,3 +186,10 @@ func (p *streamProtocolV2) stream(conn streamCreator) error {
// waits for errorStream to finish reading with an error or nil // waits for errorStream to finish reading with an error or nil
return <-errorChan return <-errorChan
} }
// errorDecoderV2 interprets the error channel data as plain text.
type errorDecoderV2 struct{}
func (d *errorDecoderV2) decode(message []byte) error {
return fmt.Errorf("error executing remote command: %s", message)
}

View File

@ -199,7 +199,7 @@ func TestV2ErrorStreamReading(t *testing.T) {
h := newStreamProtocolV2(StreamOptions{}).(*streamProtocolV2) h := newStreamProtocolV2(StreamOptions{}).(*streamProtocolV2)
h.errorStream = test.stream h.errorStream = test.stream
ch := h.setupErrorStreamReading() ch := watchErrorStream(h.errorStream, &errorDecoderV2{})
if ch == nil { if ch == nil {
t.Fatalf("%s: unexpected nil channel", test.name) t.Fatalf("%s: unexpected nil channel", test.name)
} }

View File

@ -90,7 +90,7 @@ func (p *streamProtocolV3) stream(conn streamCreator) error {
// now that all the streams have been created, proceed with reading & copying // now that all the streams have been created, proceed with reading & copying
errorChan := p.setupErrorStreamReading() errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{})
p.handleResizes() p.handleResizes()
@ -106,3 +106,7 @@ func (p *streamProtocolV3) stream(conn streamCreator) error {
// waits for errorStream to finish reading with an error or nil // waits for errorStream to finish reading with an error or nil
return <-errorChan return <-errorChan
} }
type errorDecoderV3 struct {
errorDecoderV2
}

View File

@ -0,0 +1,119 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"sync"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/util/exec"
)
// streamProtocolV4 implements version 4 of the streaming protocol for attach
// and exec. This version adds support for exit codes on the error stream through
// the use of unversioned.Status instead of plain text messages.
type streamProtocolV4 struct {
*streamProtocolV3
}
var _ streamProtocolHandler = &streamProtocolV4{}
func newStreamProtocolV4(options StreamOptions) streamProtocolHandler {
return &streamProtocolV4{
streamProtocolV3: newStreamProtocolV3(options).(*streamProtocolV3),
}
}
func (p *streamProtocolV4) createStreams(conn streamCreator) error {
return p.streamProtocolV3.createStreams(conn)
}
func (p *streamProtocolV4) handleResizes() {
p.streamProtocolV3.handleResizes()
}
func (p *streamProtocolV4) stream(conn streamCreator) error {
if err := p.createStreams(conn); err != nil {
return err
}
// now that all the streams have been created, proceed with reading & copying
errorChan := watchErrorStream(p.errorStream, &errorDecoderV4{})
p.handleResizes()
p.copyStdin()
var wg sync.WaitGroup
p.copyStdout(&wg)
p.copyStderr(&wg)
// we're waiting for stdout/stderr to finish copying
wg.Wait()
// waits for errorStream to finish reading with an error or nil
return <-errorChan
}
// errorDecoderV4 interprets the json-marshaled unversioned.Status on the error channel
// and creates an exec.ExitError from it.
type errorDecoderV4 struct{}
func (d *errorDecoderV4) decode(message []byte) error {
status := unversioned.Status{}
err := json.Unmarshal(message, &status)
if err != nil {
return fmt.Errorf("error stream protocol error: %v in %q", err, string(message))
}
switch status.Status {
case unversioned.StatusSuccess:
return nil
case unversioned.StatusFailure:
if status.Reason == remotecommand.NonZeroExitCodeReason {
if status.Details == nil {
return errors.New("error stream protocol error: details must be set")
}
for i := range status.Details.Causes {
c := &status.Details.Causes[i]
if c.Type != remotecommand.ExitCodeCauseType {
continue
}
rc, err := strconv.ParseUint(c.Message, 10, 8)
if err != nil {
return fmt.Errorf("error stream protocol error: invalid exit code value %q", c.Message)
}
return exec.CodeExitError{
Err: fmt.Errorf("command terminated with exit code %d", rc),
Code: int(rc),
}
}
return fmt.Errorf("error stream protocol error: no %s cause given", remotecommand.ExitCodeCauseType)
}
default:
return errors.New("error stream protocol error: unknown error")
}
return fmt.Errorf(status.Message)
}

View File

@ -0,0 +1,71 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"fmt"
"testing"
)
func TestV4ErrorDecoder(t *testing.T) {
dec := errorDecoderV4{}
type Test struct {
message string
err string
}
for _, test := range []Test{
{
message: "{}",
err: "error stream protocol error: unknown error",
},
{
message: "{",
err: "error stream protocol error: unexpected end of JSON input in \"{\"",
},
{
message: `{"status": "Success" }`,
err: "",
},
{
message: `{"status": "Failure", "message": "foobar" }`,
err: "foobar",
},
{
message: `{"status": "Failure", "message": "foobar", "reason": "NonZeroExitCode", "details": {"causes": [{"reason": "foo"}] } }`,
err: "error stream protocol error: no ExitCode cause given",
},
{
message: `{"status": "Failure", "message": "foobar", "reason": "NonZeroExitCode", "details": {"causes": [{"reason": "ExitCode"}] } }`,
err: "error stream protocol error: invalid exit code value \"\"",
},
{
message: `{"status": "Failure", "message": "foobar", "reason": "NonZeroExitCode", "details": {"causes": [{"reason": "ExitCode", "message": "42"}] } }`,
err: "command terminated with exit code 42",
},
} {
err := dec.decode([]byte(test.message))
want := test.err
if want == "" {
want = "<nil>"
}
if got := fmt.Sprintf("%v", err); got != want {
t.Errorf("wrong error for message %q: want=%q, got=%q", test.message, want, got)
}
}
}

View File

@ -46,8 +46,8 @@ import (
) )
func initTestErrorHandler(t *testing.T) { func initTestErrorHandler(t *testing.T) {
cmdutil.BehaviorOnFatal(func(str string) { cmdutil.BehaviorOnFatal(func(str string, code int) {
t.Errorf("Error running command: %s", str) t.Errorf("Error running command (exit code %d): %s", code, str)
}) })
} }

View File

@ -177,7 +177,7 @@ func TestCordon(t *testing.T) {
// Restore cmdutil behavior // Restore cmdutil behavior
cmdutil.DefaultBehaviorOnFatal() cmdutil.DefaultBehaviorOnFatal()
}() }()
cmdutil.BehaviorOnFatal(func(e string) { saw_fatal = true; panic(e) }) cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) })
cmd.SetArgs([]string{test.arg}) cmd.SetArgs([]string{test.arg})
cmd.Execute() cmd.Execute()
}() }()
@ -521,7 +521,7 @@ func TestDrain(t *testing.T) {
// Restore cmdutil behavior // Restore cmdutil behavior
cmdutil.DefaultBehaviorOnFatal() cmdutil.DefaultBehaviorOnFatal()
}() }()
cmdutil.BehaviorOnFatal(func(e string) { saw_fatal = true; panic(e) }) cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) })
cmd.SetArgs(test.args) cmd.SetArgs(test.args)
cmd.Execute() cmd.Execute()
}() }()

View File

@ -37,6 +37,8 @@ import (
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
uexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/watch"
) )
var ( var (
@ -114,7 +116,7 @@ func addRunFlags(cmd *cobra.Command) {
cmd.Flags().StringP("labels", "l", "", "Labels to apply to the pod(s).") cmd.Flags().StringP("labels", "l", "", "Labels to apply to the pod(s).")
cmd.Flags().BoolP("stdin", "i", false, "Keep stdin open on the container(s) in the pod, even if nothing is attached.") cmd.Flags().BoolP("stdin", "i", false, "Keep stdin open on the container(s) in the pod, even if nothing is attached.")
cmd.Flags().BoolP("tty", "t", false, "Allocated a TTY for each container in the pod.") cmd.Flags().BoolP("tty", "t", false, "Allocated a TTY for each container in the pod.")
cmd.Flags().Bool("attach", false, "If true, wait for the Pod to start running, and then attach to the Pod as if 'kubectl attach ...' were called. Default false, unless '-i/--stdin' is set, in which case the default is true.") cmd.Flags().Bool("attach", false, "If true, wait for the Pod to start running, and then attach to the Pod as if 'kubectl attach ...' were called. Default false, unless '-i/--stdin' is set, in which case the default is true. With '--restart=Never' the exit code of the container process is returned.")
cmd.Flags().Bool("leave-stdin-open", false, "If the pod is started in interactive mode or with stdin, leave stdin open after the first attach completes. By default, stdin will be closed after the first attach completes.") cmd.Flags().Bool("leave-stdin-open", false, "If the pod is started in interactive mode or with stdin, leave stdin open after the first attach completes. By default, stdin will be closed after the first attach completes.")
cmd.Flags().String("restart", "Always", "The restart policy for this Pod. Legal values [Always, OnFailure, Never]. If set to 'Always' a deployment is created for this pod, if set to 'OnFailure', a job is created for this pod, if set to 'Never', a regular pod is created. For the latter two --replicas must be 1. Default 'Always'") cmd.Flags().String("restart", "Always", "The restart policy for this Pod. Legal values [Always, OnFailure, Never]. If set to 'Always' a deployment is created for this pod, if set to 'OnFailure', a job is created for this pod, if set to 'Never', a regular pod is created. For the latter two --replicas must be 1. Default 'Always'")
cmd.Flags().Bool("command", false, "If true and extra arguments are present, use them as the 'command' field in the container, rather than the 'args' field which is the default.") cmd.Flags().Bool("command", false, "If true and extra arguments are present, use them as the 'command' field in the container, rather than the 'args' field which is the default.")
@ -128,7 +130,6 @@ func addRunFlags(cmd *cobra.Command) {
} }
func Run(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cobra.Command, args []string, argsLenAtDash int) error { func Run(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cobra.Command, args []string, argsLenAtDash int) error {
quiet := cmdutil.GetFlagBool(cmd, "quiet")
if len(os.Args) > 1 && os.Args[1] == "run-container" { if len(os.Args) > 1 && os.Args[1] == "run-container" {
printDeprecationWarning("run", "run-container") printDeprecationWarning("run", "run-container")
} }
@ -243,6 +244,7 @@ func Run(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cob
} }
if attach { if attach {
quiet := cmdutil.GetFlagBool(cmd, "quiet")
opts := &AttachOptions{ opts := &AttachOptions{
StreamOptions: StreamOptions{ StreamOptions: StreamOptions{
In: cmdIn, In: cmdIn,
@ -273,11 +275,21 @@ func Run(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cob
if err != nil { if err != nil {
return err return err
} }
err = handleAttachPod(f, client, attachablePod, opts, quiet) err = handleAttachPod(f, client, attachablePod.Namespace, attachablePod.Name, opts, quiet)
if err != nil { if err != nil {
return err return err
} }
var pod *api.Pod
leaveStdinOpen := cmdutil.GetFlagBool(cmd, "leave-stdin-open")
waitForExitCode := !leaveStdinOpen && restartPolicy == api.RestartPolicyNever
if waitForExitCode {
pod, err = waitForPodTerminated(client, attachablePod.Namespace, attachablePod.Name, opts.Out, quiet)
if err != nil {
return err
}
}
if remove { if remove {
namespace, err = mapping.MetadataAccessor.Namespace(obj) namespace, err = mapping.MetadataAccessor.Namespace(obj)
if err != nil { if err != nil {
@ -295,11 +307,39 @@ func Run(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cob
ResourceNames(mapping.Resource, name). ResourceNames(mapping.Resource, name).
Flatten(). Flatten().
Do() Do()
return ReapResult(r, f, cmdOut, true, true, 0, -1, false, mapper, quiet) err = ReapResult(r, f, cmdOut, true, true, 0, -1, false, mapper, quiet)
if err != nil {
return err
} }
}
// after removal is done, return successfully if we are not interested in the exit code
if !waitForExitCode {
return nil return nil
} }
switch pod.Status.Phase {
case api.PodSucceeded:
return nil
case api.PodFailed:
unknownRcErr := fmt.Errorf("pod %s/%s failed with unknown exit code", pod.Namespace, pod.Name)
if len(pod.Status.ContainerStatuses) == 0 || pod.Status.ContainerStatuses[0].State.Terminated == nil {
return unknownRcErr
}
// assume here that we have at most one status because kubectl-run only creates one container per pod
rc := pod.Status.ContainerStatuses[0].State.Terminated.ExitCode
if rc == 0 {
return unknownRcErr
}
return uexec.CodeExitError{
Err: fmt.Errorf("pod %s/%s terminated", pod.Namespace, pod.Name),
Code: int(rc),
}
default:
return fmt.Errorf("pod %s/%s left in phase %s", pod.Namespace, pod.Name, pod.Status.Phase)
}
}
outputFormat := cmdutil.GetFlagString(cmd, "output") outputFormat := cmdutil.GetFlagString(cmd, "output")
if outputFormat != "" || cmdutil.GetDryRunFlag(cmd) { if outputFormat != "" || cmdutil.GetDryRunFlag(cmd) {
return f.PrintObject(cmd, mapper, obj, cmdOut) return f.PrintObject(cmd, mapper, obj, cmdOut)
@ -325,37 +365,91 @@ func contains(resourcesList map[string]*unversioned.APIResourceList, resource un
return false return false
} }
func waitForPodRunning(c *client.Client, pod *api.Pod, out io.Writer, quiet bool) (status api.PodPhase, err error) { // waitForPod watches the given pod until the exitCondition is true. Each two seconds
for { // the tick function is called e.g. for progress output.
pod, err := c.Pods(pod.Namespace).Get(pod.Name) func waitForPod(c *client.Client, ns, name string, exitCondition func(*api.Pod) bool, tick func(*api.Pod)) (*api.Pod, error) {
pod, err := c.Pods(ns).Get(name)
if err != nil { if err != nil {
return api.PodUnknown, err return nil, err
}
ready := false
if pod.Status.Phase == api.PodRunning {
ready = true
for _, status := range pod.Status.ContainerStatuses {
if !status.Ready {
ready = false
break
}
}
if ready {
return api.PodRunning, nil
}
}
if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed {
return pod.Status.Phase, nil
}
if !quiet {
fmt.Fprintf(out, "Waiting for pod %s/%s to be running, status is %s, pod ready: %v\n", pod.Namespace, pod.Name, pod.Status.Phase, ready)
}
time.Sleep(2 * time.Second)
} }
if exitCondition(pod) {
return pod, nil
} }
func handleAttachPod(f *cmdutil.Factory, c *client.Client, pod *api.Pod, opts *AttachOptions, quiet bool) error { tick(pod)
status, err := waitForPodRunning(c, pod, opts.Out, quiet)
w, err := c.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: pod.Name, ResourceVersion: pod.ResourceVersion}))
if err != nil {
return nil, err
}
t := time.NewTicker(2 * time.Second)
defer t.Stop()
go func() {
for range t.C {
tick(pod)
}
}()
err = nil
result := pod
kubectl.WatchLoop(w, func(ev watch.Event) error {
switch ev.Type {
case watch.Added, watch.Modified:
pod = ev.Object.(*api.Pod)
if exitCondition(pod) {
result = pod
w.Stop()
}
case watch.Deleted:
w.Stop()
case watch.Error:
result = nil
err = fmt.Errorf("failed to watch pod %s/%s", ns, name)
w.Stop()
}
return nil
})
return result, err
}
func waitForPodRunning(c *client.Client, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) {
exitCondition := func(pod *api.Pod) bool {
switch pod.Status.Phase {
case api.PodRunning:
for _, status := range pod.Status.ContainerStatuses {
if !status.Ready {
return false
}
}
return true
case api.PodSucceeded, api.PodFailed:
return true
default:
return false
}
}
return waitForPod(c, ns, name, exitCondition, func(pod *api.Pod) {
if !quiet {
fmt.Fprintf(out, "Waiting for pod %s/%s to be running, status is %s, pod ready: false\n", pod.Namespace, pod.Name, pod.Status.Phase)
}
})
}
func waitForPodTerminated(c *client.Client, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) {
exitCondition := func(pod *api.Pod) bool {
return pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed
}
return waitForPod(c, ns, name, exitCondition, func(pod *api.Pod) {
if !quiet {
fmt.Fprintf(out, "Waiting for pod %s/%s to terminate, status is %s\n", pod.Namespace, pod.Name, pod.Status.Phase)
}
})
}
func handleAttachPod(f *cmdutil.Factory, c *client.Client, ns, name string, opts *AttachOptions, quiet bool) error {
pod, err := waitForPodRunning(c, ns, name, opts.Out, quiet)
if err != nil { if err != nil {
return err return err
} }
@ -363,7 +457,7 @@ func handleAttachPod(f *cmdutil.Factory, c *client.Client, pod *api.Pod, opts *A
if err != nil { if err != nil {
return err return err
} }
if status == api.PodSucceeded || status == api.PodFailed { if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed {
req, err := f.LogsForObject(pod, &api.PodLogOptions{Container: ctrName}) req, err := f.LogsForObject(pod, &api.PodLogOptions{Container: ctrName})
if err != nil { if err != nil {
return err return err
@ -377,8 +471,8 @@ func handleAttachPod(f *cmdutil.Factory, c *client.Client, pod *api.Pod, opts *A
return err return err
} }
opts.Client = c opts.Client = c
opts.PodName = pod.Name opts.PodName = name
opts.Namespace = pod.Namespace opts.Namespace = ns
if err := opts.Run(); err != nil { if err := opts.Run(); err != nil {
fmt.Fprintf(opts.Out, "Error attaching, falling back to logs: %v\n", err) fmt.Fprintf(opts.Out, "Error attaching, falling back to logs: %v\n", err)
req, err := f.LogsForObject(pod, &api.PodLogOptions{Container: ctrName}) req, err := f.LogsForObject(pod, &api.PodLogOptions{Container: ctrName})

View File

@ -296,7 +296,7 @@ func TestTaint(t *testing.T) {
// Restore cmdutil behavior // Restore cmdutil behavior
cmdutil.DefaultBehaviorOnFatal() cmdutil.DefaultBehaviorOnFatal()
}() }()
cmdutil.BehaviorOnFatal(func(e string) { saw_fatal = true; panic(e) }) cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) })
cmd.SetArgs(test.args) cmd.SetArgs(test.args)
cmd.Execute() cmd.Execute()
}() }()

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/strategicpatch" "k8s.io/kubernetes/pkg/util/strategicpatch"
@ -50,6 +51,7 @@ import (
const ( const (
ApplyAnnotationsFlag = "save-config" ApplyAnnotationsFlag = "save-config"
DefaultErrorExitCode = 1
) )
type debugError interface { type debugError interface {
@ -74,9 +76,9 @@ func AddSourceToErr(verb string, source string, err error) error {
var fatalErrHandler = fatal var fatalErrHandler = fatal
// BehaviorOnFatal allows you to override the default behavior when a fatal // BehaviorOnFatal allows you to override the default behavior when a fatal
// error occurs, which is call os.Exit(1). You can pass 'panic' as a function // error occurs, which is to call os.Exit(code). You can pass 'panic' as a function
// here if you prefer the panic() over os.Exit(1). // here if you prefer the panic() over os.Exit(1).
func BehaviorOnFatal(f func(string)) { func BehaviorOnFatal(f func(string, int)) {
fatalErrHandler = f fatalErrHandler = f
} }
@ -86,9 +88,10 @@ func DefaultBehaviorOnFatal() {
fatalErrHandler = fatal fatalErrHandler = fatal
} }
// fatal prints the message and then exits. If V(2) or greater, glog.Fatal // fatal prints the message if set and then exits. If V(2) or greater, glog.Fatal
// is invoked for extended information. // is invoked for extended information.
func fatal(msg string) { func fatal(msg string, code int) {
if len(msg) > 0 {
// add newline if needed // add newline if needed
if !strings.HasSuffix(msg, "\n") { if !strings.HasSuffix(msg, "\n") {
msg += "\n" msg += "\n"
@ -98,7 +101,8 @@ func fatal(msg string) {
glog.FatalDepth(2, msg) glog.FatalDepth(2, msg)
} }
fmt.Fprint(os.Stderr, msg) fmt.Fprint(os.Stderr, msg)
os.Exit(1) }
os.Exit(code)
} }
// CheckErr prints a user friendly error to STDERR and exits with a non-zero // CheckErr prints a user friendly error to STDERR and exits with a non-zero
@ -115,43 +119,42 @@ func checkErrWithPrefix(prefix string, err error) {
checkErr(prefix, err, fatalErrHandler) checkErr(prefix, err, fatalErrHandler)
} }
func checkErr(pref string, err error, handleErr func(string)) { // checkErr formats a given error as a string and calls the passed handleErr
if err == nil { // func with that string and an kubectl exit code.
return func checkErr(prefix string, err error, handleErr func(string, int)) {
}
if kerrors.IsInvalid(err) {
details := err.(*kerrors.StatusError).Status().Details
prefix := fmt.Sprintf("%sThe %s %q is invalid.\n", pref, details.Kind, details.Name)
errs := statusCausesToAggrError(details.Causes)
handleErr(MultilineError(prefix, errs))
return
}
if noMatch, ok := err.(*meta.NoResourceMatchError); ok {
switch { switch {
case len(noMatch.PartialResource.Group) > 0 && len(noMatch.PartialResource.Version) > 0: case err == nil:
handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q in group %q and version %q", pref, noMatch.PartialResource.Resource, noMatch.PartialResource.Group, noMatch.PartialResource.Version)) return
case len(noMatch.PartialResource.Group) > 0: case kerrors.IsInvalid(err):
handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q in group %q", pref, noMatch.PartialResource.Resource, noMatch.PartialResource.Group)) details := err.(*kerrors.StatusError).Status().Details
case len(noMatch.PartialResource.Version) > 0: s := fmt.Sprintf("%sThe %s %q is invalid", prefix, details.Kind, details.Name)
handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q in version %q", pref, noMatch.PartialResource.Resource, noMatch.PartialResource.Version)) if len(details.Causes) > 0 {
errs := statusCausesToAggrError(details.Causes)
handleErr(MultilineError(s+": ", errs), DefaultErrorExitCode)
} else {
handleErr(s, DefaultErrorExitCode)
}
case clientcmd.IsConfigurationInvalid(err):
handleErr(MultilineError(fmt.Sprintf("%sError in configuration: ", prefix), err), DefaultErrorExitCode)
default: default:
handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q", pref, noMatch.PartialResource.Resource)) switch err := err.(type) {
case *meta.NoResourceMatchError:
switch {
case len(err.PartialResource.Group) > 0 && len(err.PartialResource.Version) > 0:
handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q in group %q and version %q", prefix, err.PartialResource.Resource, err.PartialResource.Group, err.PartialResource.Version), DefaultErrorExitCode)
case len(err.PartialResource.Group) > 0:
handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q in group %q", prefix, err.PartialResource.Resource, err.PartialResource.Group), DefaultErrorExitCode)
case len(err.PartialResource.Version) > 0:
handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q in version %q", prefix, err.PartialResource.Resource, err.PartialResource.Version), DefaultErrorExitCode)
default:
handleErr(fmt.Sprintf("%sthe server doesn't have a resource type %q", prefix, err.PartialResource.Resource), DefaultErrorExitCode)
} }
return case utilerrors.Aggregate:
} handleErr(MultipleErrors(prefix, err.Errors()), DefaultErrorExitCode)
case utilexec.ExitError:
// handle multiline errors // do not print anything, only terminate with given error
if clientcmd.IsConfigurationInvalid(err) { handleErr("", err.ExitStatus())
handleErr(MultilineError(fmt.Sprintf("%sError in configuration: ", pref), err)) default: // for any other error type
return
}
if agg, ok := err.(utilerrors.Aggregate); ok && len(agg.Errors()) > 0 {
handleErr(MultipleErrors(pref, agg.Errors()))
return
}
msg, ok := StandardErrorMessage(err) msg, ok := StandardErrorMessage(err)
if !ok { if !ok {
msg = err.Error() msg = err.Error()
@ -159,7 +162,9 @@ func checkErr(pref string, err error, handleErr func(string)) {
msg = fmt.Sprintf("error: %s", msg) msg = fmt.Sprintf("error: %s", msg)
} }
} }
handleErr(fmt.Sprintf("%s%s", pref, msg)) handleErr(msg, DefaultErrorExitCode)
}
}
} }
func statusCausesToAggrError(scs []unversioned.StatusCause) utilerrors.Aggregate { func statusCausesToAggrError(scs []unversioned.StatusCause) utilerrors.Aggregate {

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
uexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/validation/field"
) )
@ -213,91 +214,88 @@ func (f *fileHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
res.Write(f.data) res.Write(f.data)
} }
func TestCheckInvalidErr(t *testing.T) { type checkErrTestCase struct {
tests := []struct {
err error err error
expected string expectedErr string
}{ expectedCode int
}
func TestCheckInvalidErr(t *testing.T) {
testCheckError(t, []checkErrTestCase{
{ {
errors.NewInvalid(api.Kind("Invalid1"), "invalidation", field.ErrorList{field.Invalid(field.NewPath("field"), "single", "details")}), errors.NewInvalid(api.Kind("Invalid1"), "invalidation", field.ErrorList{field.Invalid(field.NewPath("field"), "single", "details")}),
`error: The Invalid1 "invalidation" is invalid. field: Invalid value: "single": details`, "The Invalid1 \"invalidation\" is invalid: field: Invalid value: \"single\": details\n",
DefaultErrorExitCode,
}, },
{ {
errors.NewInvalid(api.Kind("Invalid2"), "invalidation", field.ErrorList{field.Invalid(field.NewPath("field1"), "multi1", "details"), field.Invalid(field.NewPath("field2"), "multi2", "details")}), errors.NewInvalid(api.Kind("Invalid2"), "invalidation", field.ErrorList{field.Invalid(field.NewPath("field1"), "multi1", "details"), field.Invalid(field.NewPath("field2"), "multi2", "details")}),
`error: The Invalid2 "invalidation" is invalid. * field1: Invalid value: "multi1": details, * field2: Invalid value: "multi2": details`, "The Invalid2 \"invalidation\" is invalid: \n* field1: Invalid value: \"multi1\": details\n* field2: Invalid value: \"multi2\": details\n",
DefaultErrorExitCode,
}, },
{ {
errors.NewInvalid(api.Kind("Invalid3"), "invalidation", field.ErrorList{}), errors.NewInvalid(api.Kind("Invalid3"), "invalidation", field.ErrorList{}),
`error: The Invalid3 "invalidation" is invalid. %!s(<nil>)`, "The Invalid3 \"invalidation\" is invalid",
DefaultErrorExitCode,
}, },
{ {
errors.NewInvalid(api.Kind("Invalid4"), "invalidation", field.ErrorList{field.Invalid(field.NewPath("field4"), "multi4", "details"), field.Invalid(field.NewPath("field4"), "multi4", "details")}), errors.NewInvalid(api.Kind("Invalid4"), "invalidation", field.ErrorList{field.Invalid(field.NewPath("field4"), "multi4", "details"), field.Invalid(field.NewPath("field4"), "multi4", "details")}),
`error: The Invalid4 "invalidation" is invalid. field4: Invalid value: "multi4": details`, "The Invalid4 \"invalidation\" is invalid: field4: Invalid value: \"multi4\": details\n",
DefaultErrorExitCode,
}, },
} })
var errReturned string
errHandle := func(err string) {
for _, v := range strings.Split(err, "\n") {
separator := " "
if errReturned == "" || v == "" {
separator = ""
} else if !strings.HasSuffix(errReturned, ".") {
separator = ", "
}
errReturned = fmt.Sprintf("%s%s%s", errReturned, separator, v)
}
if !strings.HasPrefix(errReturned, "error: ") {
errReturned = fmt.Sprintf("error: %s", errReturned)
}
if strings.HasSuffix(errReturned, ", ") {
errReturned = errReturned[:len(errReturned)-len(" ,")]
}
}
for _, test := range tests {
checkErr("", test.err, errHandle)
if errReturned != test.expected {
t.Fatalf("Got: %s, expected: %s", errReturned, test.expected)
}
errReturned = ""
}
} }
func TestCheckNoResourceMatchError(t *testing.T) { func TestCheckNoResourceMatchError(t *testing.T) {
tests := []struct { testCheckError(t, []checkErrTestCase{
err error
expected string
}{
{ {
&meta.NoResourceMatchError{PartialResource: unversioned.GroupVersionResource{Resource: "foo"}}, &meta.NoResourceMatchError{PartialResource: unversioned.GroupVersionResource{Resource: "foo"}},
`the server doesn't have a resource type "foo"`, `the server doesn't have a resource type "foo"`,
DefaultErrorExitCode,
}, },
{ {
&meta.NoResourceMatchError{PartialResource: unversioned.GroupVersionResource{Version: "theversion", Resource: "foo"}}, &meta.NoResourceMatchError{PartialResource: unversioned.GroupVersionResource{Version: "theversion", Resource: "foo"}},
`the server doesn't have a resource type "foo" in version "theversion"`, `the server doesn't have a resource type "foo" in version "theversion"`,
DefaultErrorExitCode,
}, },
{ {
&meta.NoResourceMatchError{PartialResource: unversioned.GroupVersionResource{Group: "thegroup", Version: "theversion", Resource: "foo"}}, &meta.NoResourceMatchError{PartialResource: unversioned.GroupVersionResource{Group: "thegroup", Version: "theversion", Resource: "foo"}},
`the server doesn't have a resource type "foo" in group "thegroup" and version "theversion"`, `the server doesn't have a resource type "foo" in group "thegroup" and version "theversion"`,
DefaultErrorExitCode,
}, },
{ {
&meta.NoResourceMatchError{PartialResource: unversioned.GroupVersionResource{Group: "thegroup", Resource: "foo"}}, &meta.NoResourceMatchError{PartialResource: unversioned.GroupVersionResource{Group: "thegroup", Resource: "foo"}},
`the server doesn't have a resource type "foo" in group "thegroup"`, `the server doesn't have a resource type "foo" in group "thegroup"`,
DefaultErrorExitCode,
}, },
})
} }
func TestCheckExitError(t *testing.T) {
testCheckError(t, []checkErrTestCase{
{
uexec.CodeExitError{Err: fmt.Errorf("pod foo/bar terminated"), Code: 42},
"",
42,
},
})
}
func testCheckError(t *testing.T, tests []checkErrTestCase) {
var errReturned string var errReturned string
errHandle := func(err string) { var codeReturned int
errHandle := func(err string, code int) {
errReturned = err errReturned = err
codeReturned = code
} }
for _, test := range tests { for _, test := range tests {
checkErr("", test.err, errHandle) checkErr("", test.err, errHandle)
if errReturned != test.expected { if errReturned != test.expectedErr {
t.Fatalf("Got: %s, expected: %s", errReturned, test.expected) t.Fatalf("Got: %s, expected: %s", errReturned, test.expectedErr)
}
if codeReturned != test.expectedCode {
t.Fatalf("Got: %d, expected: %d", codeReturned, test.expectedCode)
} }
} }
} }

View File

@ -26,6 +26,7 @@ import (
dockertypes "github.com/docker/engine-api/types" dockertypes "github.com/docker/engine-api/types"
"github.com/golang/glog" "github.com/golang/glog"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/util/term"
) )
@ -74,7 +75,7 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do
go io.Copy(stdout, p) go io.Copy(stdout, p)
} }
return command.Wait() err = command.Wait()
} else { } else {
if stdin != nil { if stdin != nil {
// Use an os.Pipe here as it returns true *os.File objects. // Use an os.Pipe here as it returns true *os.File objects.
@ -96,8 +97,13 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do
command.Stderr = stderr command.Stderr = stderr
} }
return command.Run() err = command.Run()
} }
if exitErr, ok := err.(*exec.ExitError); ok {
return &utilexec.ExitErrorWrapper{ExitError: exitErr}
}
return err
} }
// NativeExecHandler executes commands in Docker containers using Docker's exec API. // NativeExecHandler executes commands in Docker containers using Docker's exec API.

View File

@ -17,12 +17,13 @@ limitations under the License.
package remotecommand package remotecommand
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"time" "time"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/util/term"
@ -47,8 +48,12 @@ func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, po
err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan) err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan)
if err != nil { if err != nil {
msg := fmt.Sprintf("error attaching to container: %v", err) err = fmt.Errorf("error attaching to container: %v", err)
runtime.HandleError(errors.New(msg)) runtime.HandleError(err)
fmt.Fprint(ctx.errorStream, msg) ctx.writeStatus(apierrors.NewInternalError(err))
} else {
ctx.writeStatus(&apierrors.StatusError{ErrStatus: unversioned.Status{
Status: unversioned.StatusSuccess,
}})
} }
} }

View File

@ -36,6 +36,11 @@ const (
// attachment/execution. It is the third version of the subprotocol and // attachment/execution. It is the third version of the subprotocol and
// adds support for resizing container terminals. // adds support for resizing container terminals.
StreamProtocolV3Name = "v3.channel.k8s.io" StreamProtocolV3Name = "v3.channel.k8s.io"
// The SPDY subprotocol "v4.channel.k8s.io" is used for remote command
// attachment/execution. It is the 4th version of the subprotocol and
// adds support for exit codes.
StreamProtocolV4Name = "v4.channel.k8s.io"
) )
var SupportedStreamingProtocols = []string{StreamProtocolV3Name, StreamProtocolV2Name, StreamProtocolV1Name} var SupportedStreamingProtocols = []string{StreamProtocolV4Name, StreamProtocolV3Name, StreamProtocolV2Name, StreamProtocolV1Name}

View File

@ -17,18 +17,25 @@ limitations under the License.
package remotecommand package remotecommand
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"time" "time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/util/term"
) )
const (
NonZeroExitCodeReason = unversioned.StatusReason("NonZeroExitCode")
ExitCodeCauseType = unversioned.CauseType("ExitCode")
)
// Executor knows how to execute a command in a container in a pod. // Executor knows how to execute a command in a container in a pod.
type Executor interface { type Executor interface {
// ExecInContainer executes a command in a container in the pod, copying data // ExecInContainer executes a command in a container in the pod, copying data
@ -51,8 +58,29 @@ func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podN
err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan) err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan)
if err != nil { if err != nil {
msg := fmt.Sprintf("error executing command in container: %v", err) if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() {
runtime.HandleError(errors.New(msg)) rc := exitErr.ExitStatus()
fmt.Fprint(ctx.errorStream, msg) ctx.writeStatus(&apierrors.StatusError{ErrStatus: unversioned.Status{
Status: unversioned.StatusFailure,
Reason: NonZeroExitCodeReason,
Details: &unversioned.StatusDetails{
Causes: []unversioned.StatusCause{
{
Type: ExitCodeCauseType,
Message: fmt.Sprintf("%d", rc),
},
},
},
Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr),
}})
} else {
err = fmt.Errorf("error executing command in container: %v", err)
runtime.HandleError(err)
ctx.writeStatus(apierrors.NewInternalError(err))
}
} else {
ctx.writeStatus(&apierrors.StatusError{ErrStatus: unversioned.Status{
Status: unversioned.StatusSuccess,
}})
} }
} }

View File

@ -25,6 +25,8 @@ import (
"time" "time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy" "k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/runtime"
@ -88,7 +90,7 @@ type context struct {
stdinStream io.ReadCloser stdinStream io.ReadCloser
stdoutStream io.WriteCloser stdoutStream io.WriteCloser
stderrStream io.WriteCloser stderrStream io.WriteCloser
errorStream io.WriteCloser writeStatus func(status *apierrors.StatusError) error
resizeStream io.ReadCloser resizeStream io.ReadCloser
resizeChan chan term.Size resizeChan chan term.Size
tty bool tty bool
@ -168,6 +170,8 @@ func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *opt
var handler protocolHandler var handler protocolHandler
switch protocol { switch protocol {
case StreamProtocolV4Name:
handler = &v4ProtocolHandler{}
case StreamProtocolV3Name: case StreamProtocolV3Name:
handler = &v3ProtocolHandler{} handler = &v3ProtocolHandler{}
case StreamProtocolV2Name: case StreamProtocolV2Name:
@ -206,6 +210,59 @@ type protocolHandler interface {
supportsTerminalResizing() bool supportsTerminalResizing() bool
} }
// v4ProtocolHandler implements the V4 protocol version for streaming command execution. It only differs
// in from v3 in the error stream format using an json-marshaled unversioned.Status which carries
// the process' exit code.
type v4ProtocolHandler struct{}
func (*v4ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
ctx := &context{}
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
WaitForStreams:
for {
select {
case stream := <-streams:
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
ctx.writeStatus = v4WriteStatusFunc(stream) // write json errors
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdin:
ctx.stdinStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdout:
ctx.stdoutStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStderr:
ctx.stderrStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeResize:
ctx.resizeStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
default:
runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
case <-expired:
// TODO find a way to return the error to the user. Maybe use a separate
// stream to report errors?
return nil, errors.New("timed out waiting for client to create streams")
}
}
return ctx, nil
}
// supportsTerminalResizing returns true because v4ProtocolHandler supports it
func (*v4ProtocolHandler) supportsTerminalResizing() bool { return true }
// v3ProtocolHandler implements the V3 protocol version for streaming command execution. // v3ProtocolHandler implements the V3 protocol version for streaming command execution.
type v3ProtocolHandler struct{} type v3ProtocolHandler struct{}
@ -222,7 +279,7 @@ WaitForStreams:
streamType := stream.Headers().Get(api.StreamType) streamType := stream.Headers().Get(api.StreamType)
switch streamType { switch streamType {
case api.StreamTypeError: case api.StreamTypeError:
ctx.errorStream = stream ctx.writeStatus = v1WriteStatusFunc(stream)
go waitStreamReply(stream.replySent, replyChan, stop) go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdin: case api.StreamTypeStdin:
ctx.stdinStream = stream ctx.stdinStream = stream
@ -273,7 +330,7 @@ WaitForStreams:
streamType := stream.Headers().Get(api.StreamType) streamType := stream.Headers().Get(api.StreamType)
switch streamType { switch streamType {
case api.StreamTypeError: case api.StreamTypeError:
ctx.errorStream = stream ctx.writeStatus = v1WriteStatusFunc(stream)
go waitStreamReply(stream.replySent, replyChan, stop) go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdin: case api.StreamTypeStdin:
ctx.stdinStream = stream ctx.stdinStream = stream
@ -321,7 +378,7 @@ WaitForStreams:
streamType := stream.Headers().Get(api.StreamType) streamType := stream.Headers().Get(api.StreamType)
switch streamType { switch streamType {
case api.StreamTypeError: case api.StreamTypeError:
ctx.errorStream = stream ctx.writeStatus = v1WriteStatusFunc(stream)
// This defer statement shouldn't be here, but due to previous refactoring, it ended up in // This defer statement shouldn't be here, but due to previous refactoring, it ended up in
// here. This is what 1.0.x kubelets do, so we're retaining that behavior. This is fixed in // here. This is what 1.0.x kubelets do, so we're retaining that behavior. This is fixed in
@ -375,3 +432,26 @@ func handleResizeEvents(stream io.Reader, channel chan<- term.Size) {
channel <- size channel <- size
} }
} }
func v1WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error {
return func(status *apierrors.StatusError) error {
if status.Status().Status == unversioned.StatusSuccess {
return nil // send error messages
}
_, err := stream.Write([]byte(status.Error()))
return err
}
}
// v4WriteStatusFunc returns a WriteStatusFunc that marshals a given api Status
// as json in the error channel.
func v4WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error {
return func(status *apierrors.StatusError) error {
bs, err := json.Marshal(status.Status())
if err != nil {
return err
}
_, err = stream.Write(bs)
return err
}
}

View File

@ -32,6 +32,11 @@ const (
stderrChannel stderrChannel
errorChannel errorChannel
resizeChannel resizeChannel
preV4BinaryWebsocketProtocol = wsstream.ChannelWebSocketProtocol
preV4Base64WebsocketProtocol = wsstream.Base64ChannelWebSocketProtocol
v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol
v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol
) )
// createChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2) // createChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2)
@ -67,9 +72,30 @@ func writeChannel(real bool) wsstream.ChannelType {
// streams needed to perform an exec or an attach. // streams needed to perform an exec or an attach.
func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *options, idleTimeout time.Duration) (*context, bool) { func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *options, idleTimeout time.Duration) (*context, bool) {
channels := createChannels(opts) channels := createChannels(opts)
conn := wsstream.NewConn(channels...) conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{
"": {
Binary: true,
Channels: channels,
},
preV4BinaryWebsocketProtocol: {
Binary: true,
Channels: channels,
},
preV4Base64WebsocketProtocol: {
Binary: false,
Channels: channels,
},
v4BinaryWebsocketProtocol: {
Binary: true,
Channels: channels,
},
v4Base64WebsocketProtocol: {
Binary: false,
Channels: channels,
},
})
conn.SetIdleTimeout(idleTimeout) conn.SetIdleTimeout(idleTimeout)
streams, err := conn.Open(httplog.Unlogged(w), req) negotiatedProtocol, streams, err := conn.Open(httplog.Unlogged(w), req)
if err != nil { if err != nil {
runtime.HandleError(fmt.Errorf("Unable to upgrade websocket connection: %v", err)) runtime.HandleError(fmt.Errorf("Unable to upgrade websocket connection: %v", err))
return nil, false return nil, false
@ -86,13 +112,21 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *opti
streams[errorChannel].Write([]byte{}) streams[errorChannel].Write([]byte{})
} }
return &context{ ctx := &context{
conn: conn, conn: conn,
stdinStream: streams[stdinChannel], stdinStream: streams[stdinChannel],
stdoutStream: streams[stdoutChannel], stdoutStream: streams[stdoutChannel],
stderrStream: streams[stderrChannel], stderrStream: streams[stderrChannel],
errorStream: streams[errorChannel],
tty: opts.tty, tty: opts.tty,
resizeStream: streams[resizeChannel], resizeStream: streams[resizeChannel],
}, true }
switch negotiatedProtocol {
case v4BinaryWebsocketProtocol, v4Base64WebsocketProtocol:
ctx.writeStatus = v4WriteStatusFunc(streams[errorChannel])
default:
ctx.writeStatus = v1WriteStatusFunc(streams[errorChannel])
}
return ctx, true
} }

View File

@ -113,7 +113,7 @@ func (cmd *cmdWrapper) Output() ([]byte, error) {
func handleError(err error) error { func handleError(err error) error {
if ee, ok := err.(*osexec.ExitError); ok { if ee, ok := err.(*osexec.ExitError); ok {
// Force a compile fail if exitErrorWrapper can't convert to ExitError. // Force a compile fail if exitErrorWrapper can't convert to ExitError.
var x ExitError = &exitErrorWrapper{ee} var x ExitError = &ExitErrorWrapper{ee}
return x return x
} }
if ee, ok := err.(*osexec.Error); ok { if ee, ok := err.(*osexec.Error); ok {
@ -124,17 +124,44 @@ func handleError(err error) error {
return err return err
} }
// exitErrorWrapper is an implementation of ExitError in terms of os/exec ExitError. // ExitErrorWrapper is an implementation of ExitError in terms of os/exec ExitError.
// Note: standard exec.ExitError is type *os.ProcessState, which already implements Exited(). // Note: standard exec.ExitError is type *os.ProcessState, which already implements Exited().
type exitErrorWrapper struct { type ExitErrorWrapper struct {
*osexec.ExitError *osexec.ExitError
} }
var _ ExitError = ExitErrorWrapper{}
// ExitStatus is part of the ExitError interface. // ExitStatus is part of the ExitError interface.
func (eew exitErrorWrapper) ExitStatus() int { func (eew ExitErrorWrapper) ExitStatus() int {
ws, ok := eew.Sys().(syscall.WaitStatus) ws, ok := eew.Sys().(syscall.WaitStatus)
if !ok { if !ok {
panic("can't call ExitStatus() on a non-WaitStatus exitErrorWrapper") panic("can't call ExitStatus() on a non-WaitStatus exitErrorWrapper")
} }
return ws.ExitStatus() return ws.ExitStatus()
} }
// CodeExitError is an implementation of ExitError consisting of an error object
// and an exit code (the upper bits of os.exec.ExitStatus).
type CodeExitError struct {
Err error
Code int
}
var _ ExitError = CodeExitError{}
func (e CodeExitError) Error() string {
return e.Err.Error()
}
func (e CodeExitError) String() string {
return e.Err.Error()
}
func (e CodeExitError) Exited() bool {
return true
}
func (e CodeExitError) ExitStatus() int {
return e.Code
}

View File

@ -27,6 +27,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
"k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/runtime"
) )
@ -44,7 +45,7 @@ import (
// READ []byte{1, 10} # receive "\n" on channel 1 (STDOUT) // READ []byte{1, 10} # receive "\n" on channel 1 (STDOUT)
// CLOSE // CLOSE
// //
const channelWebSocketProtocol = "channel.k8s.io" const ChannelWebSocketProtocol = "channel.k8s.io"
// The Websocket subprotocol "base64.channel.k8s.io" base64 encodes each message with a character // The Websocket subprotocol "base64.channel.k8s.io" base64 encodes each message with a character
// indicating the channel number (zero indexed) the message was sent on. Messages in both directions // indicating the channel number (zero indexed) the message was sent on. Messages in both directions
@ -60,7 +61,7 @@ const channelWebSocketProtocol = "channel.k8s.io"
// READ []byte{49, 67, 103, 61, 61} # receive "\n" (base64: "Cg==") on channel '1' (STDOUT) // READ []byte{49, 67, 103, 61, 61} # receive "\n" (base64: "Cg==") on channel '1' (STDOUT)
// CLOSE // CLOSE
// //
const base64ChannelWebSocketProtocol = "base64.channel.k8s.io" const Base64ChannelWebSocketProtocol = "base64.channel.k8s.io"
type codecType int type codecType int
@ -107,8 +108,9 @@ func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) {
func handshake(config *websocket.Config, req *http.Request, allowed []string) error { func handshake(config *websocket.Config, req *http.Request, allowed []string) error {
protocols := config.Protocol protocols := config.Protocol
if len(protocols) == 0 { if len(protocols) == 0 {
return nil protocols = []string{""}
} }
for _, protocol := range protocols { for _, protocol := range protocols {
for _, allow := range allowed { for _, allow := range allowed {
if allow == protocol { if allow == protocol {
@ -117,12 +119,31 @@ func handshake(config *websocket.Config, req *http.Request, allowed []string) er
} }
} }
} }
return fmt.Errorf("requested protocol(s) are not supported: %v; supports %v", config.Protocol, allowed) return fmt.Errorf("requested protocol(s) are not supported: %v; supports %v", config.Protocol, allowed)
} }
// ChannelProtocolConfig describes a websocket subprotocol with channels.
type ChannelProtocolConfig struct {
Binary bool
Channels []ChannelType
}
// NewDefaultChannelProtocols returns a channel protocol map with the
// subprotocols "", "channel.k8s.io", "base64.channel.k8s.io" and the given
// channels.
func NewDefaultChannelProtocols(channels []ChannelType) map[string]ChannelProtocolConfig {
return map[string]ChannelProtocolConfig{
"": {Binary: true, Channels: channels},
ChannelWebSocketProtocol: {Binary: true, Channels: channels},
Base64ChannelWebSocketProtocol: {Binary: false, Channels: channels},
}
}
// Conn supports sending multiple binary channels over a websocket connection. // Conn supports sending multiple binary channels over a websocket connection.
// Supports only the "channel.k8s.io" subprotocol.
type Conn struct { type Conn struct {
protocols map[string]ChannelProtocolConfig
selectedProtocol string
channels []*websocketChannel channels []*websocketChannel
codec codecType codec codecType
ready chan struct{} ready chan struct{}
@ -134,24 +155,14 @@ type Conn struct {
// web socket message with a single byte indicating the channel number (0-N). 255 is reserved for // web socket message with a single byte indicating the channel number (0-N). 255 is reserved for
// future use. The channel types for each channel are passed as an array, supporting the different // future use. The channel types for each channel are passed as an array, supporting the different
// duplex modes. Read and Write refer to whether the channel can be used as a Reader or Writer. // duplex modes. Read and Write refer to whether the channel can be used as a Reader or Writer.
func NewConn(channels ...ChannelType) *Conn { //
conn := &Conn{ // The protocols parameter maps subprotocol names to ChannelProtocols. The empty string subprotocol
// name is used if websocket.Config.Protocol is empty.
func NewConn(protocols map[string]ChannelProtocolConfig) *Conn {
return &Conn{
ready: make(chan struct{}), ready: make(chan struct{}),
channels: make([]*websocketChannel, len(channels)), protocols: protocols,
} }
for i := range conn.channels {
switch channels[i] {
case ReadChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), true, false)
case WriteChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), false, true)
case ReadWriteChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), true, true)
case IgnoreChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), false, false)
}
}
return conn
} }
// SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified, // SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified,
@ -160,8 +171,9 @@ func (conn *Conn) SetIdleTimeout(duration time.Duration) {
conn.timeout = duration conn.timeout = duration
} }
// Open the connection and create channels for reading and writing. // Open the connection and create channels for reading and writing. It returns
func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) ([]io.ReadWriteCloser, error) { // the selected subprotocol, a slice of channels and an error.
func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.ReadWriteCloser, error) {
go func() { go func() {
defer runtime.HandleCrash() defer runtime.HandleCrash()
defer conn.Close() defer conn.Close()
@ -172,23 +184,42 @@ func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) ([]io.ReadWrite
for i := range conn.channels { for i := range conn.channels {
rwc[i] = conn.channels[i] rwc[i] = conn.channels[i]
} }
return rwc, nil return conn.selectedProtocol, rwc, nil
} }
func (conn *Conn) initialize(ws *websocket.Conn) { func (conn *Conn) initialize(ws *websocket.Conn) {
protocols := ws.Config().Protocol negotiated := ws.Config().Protocol
switch { conn.selectedProtocol = negotiated[0]
case len(protocols) == 0, protocols[0] == channelWebSocketProtocol: p := conn.protocols[conn.selectedProtocol]
if p.Binary {
conn.codec = rawCodec conn.codec = rawCodec
case protocols[0] == base64ChannelWebSocketProtocol: } else {
conn.codec = base64Codec conn.codec = base64Codec
} }
conn.ws = ws conn.ws = ws
conn.channels = make([]*websocketChannel, len(p.Channels))
for i, t := range p.Channels {
switch t {
case ReadChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), true, false)
case WriteChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), false, true)
case ReadWriteChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), true, true)
case IgnoreChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), false, false)
}
}
close(conn.ready) close(conn.ready)
} }
func (conn *Conn) handshake(config *websocket.Config, req *http.Request) error { func (conn *Conn) handshake(config *websocket.Config, req *http.Request) error {
return handshake(config, req, []string{channelWebSocketProtocol, base64ChannelWebSocketProtocol}) supportedProtocols := make([]string, 0, len(conn.protocols))
for p := range conn.protocols {
supportedProtocols = append(supportedProtocols, p)
}
return handshake(config, req, supportedProtocols)
} }
func (conn *Conn) resetTimeout() { func (conn *Conn) resetTimeout() {

View File

@ -20,6 +20,7 @@ import (
"encoding/base64" "encoding/base64"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http"
"net/http/httptest" "net/http/httptest"
"reflect" "reflect"
"sync" "sync"
@ -28,15 +29,19 @@ import (
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
) )
func newServer(handler websocket.Handler) (*httptest.Server, string) { func newServer(handler http.Handler) (*httptest.Server, string) {
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
serverAddr := server.Listener.Addr().String() serverAddr := server.Listener.Addr().String()
return server, serverAddr return server, serverAddr
} }
func TestRawConn(t *testing.T) { func TestRawConn(t *testing.T) {
conn := NewConn(ReadWriteChannel, ReadWriteChannel, IgnoreChannel, ReadChannel, WriteChannel) channels := []ChannelType{ReadWriteChannel, ReadWriteChannel, IgnoreChannel, ReadChannel, WriteChannel}
s, addr := newServer(conn.handle) conn := NewConn(NewDefaultChannelProtocols(channels))
s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
conn.Open(w, req)
}))
defer s.Close() defer s.Close()
client, err := websocket.Dial("ws://"+addr, "", "http://localhost/") client, err := websocket.Dial("ws://"+addr, "", "http://localhost/")
@ -112,8 +117,10 @@ func TestRawConn(t *testing.T) {
} }
func TestBase64Conn(t *testing.T) { func TestBase64Conn(t *testing.T) {
conn := NewConn(ReadWriteChannel, ReadWriteChannel) conn := NewConn(NewDefaultChannelProtocols([]ChannelType{ReadWriteChannel, ReadWriteChannel}))
s, addr := newServer(conn.handle) s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
conn.Open(w, req)
}))
defer s.Close() defer s.Close()
config, err := websocket.NewConfig("ws://"+addr, "http://localhost/") config, err := websocket.NewConfig("ws://"+addr, "http://localhost/")
@ -167,3 +174,99 @@ func TestBase64Conn(t *testing.T) {
client.Close() client.Close()
wg.Wait() wg.Wait()
} }
type versionTest struct {
supported map[string]bool // protocol -> binary
requested []string
error bool
expected string
}
func versionTests() []versionTest {
const (
binary = true
base64 = false
)
return []versionTest{
{
supported: nil,
requested: []string{"raw"},
error: true,
},
{
supported: map[string]bool{"": binary, "raw": binary, "base64": base64},
requested: nil,
expected: "",
},
{
supported: map[string]bool{"": binary, "raw": binary, "base64": base64},
requested: []string{"v1.raw"},
error: true,
},
{
supported: map[string]bool{"": binary, "raw": binary, "base64": base64},
requested: []string{"v1.raw", "v1.base64"},
error: true,
}, {
supported: map[string]bool{"": binary, "raw": binary, "base64": base64},
requested: []string{"v1.raw", "raw"},
expected: "raw",
},
{
supported: map[string]bool{"": binary, "v1.raw": binary, "v1.base64": base64, "v2.raw": binary, "v2.base64": base64},
requested: []string{"v1.raw"},
expected: "v1.raw",
},
{
supported: map[string]bool{"": binary, "v1.raw": binary, "v1.base64": base64, "v2.raw": binary, "v2.base64": base64},
requested: []string{"v2.base64"},
expected: "v2.base64",
},
}
}
func TestVersionedConn(t *testing.T) {
for i, test := range versionTests() {
func() {
supportedProtocols := map[string]ChannelProtocolConfig{}
for p, binary := range test.supported {
supportedProtocols[p] = ChannelProtocolConfig{
Binary: binary,
Channels: []ChannelType{ReadWriteChannel},
}
}
conn := NewConn(supportedProtocols)
// note that it's not enough to wait for conn.ready to avoid a race here. Hence,
// we use a channel.
selectedProtocol := make(chan string, 0)
s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
p, _, _ := conn.Open(w, req)
selectedProtocol <- p
}))
defer s.Close()
config, err := websocket.NewConfig("ws://"+addr, "http://localhost/")
if err != nil {
t.Fatal(err)
}
config.Protocol = test.requested
client, err := websocket.DialConfig(config)
if err != nil {
if !test.error {
t.Fatalf("test %d: didn't expect error: %v", i, err)
} else {
return
}
}
defer client.Close()
if test.error && err == nil {
t.Fatalf("test %d: expected an error", i)
}
<-conn.ready
if got, expected := <-selectedProtocol, test.expected; got != expected {
t.Fatalf("test %d: unexpected protocol version: got=%s expected=%s", i, got, expected)
}
}()
}
}

View File

@ -23,6 +23,7 @@ import (
"time" "time"
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
"k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/runtime"
) )
@ -37,23 +38,46 @@ const binaryWebSocketProtocol = "binary.k8s.io"
// possible. // possible.
const base64BinaryWebSocketProtocol = "base64.binary.k8s.io" const base64BinaryWebSocketProtocol = "base64.binary.k8s.io"
// ReaderProtocolConfig describes a websocket subprotocol with one stream.
type ReaderProtocolConfig struct {
Binary bool
}
// NewDefaultReaderProtocols returns a stream protocol map with the
// subprotocols "", "channel.k8s.io", "base64.channel.k8s.io".
func NewDefaultReaderProtocols() map[string]ReaderProtocolConfig {
return map[string]ReaderProtocolConfig{
"": {Binary: true},
binaryWebSocketProtocol: {Binary: true},
base64BinaryWebSocketProtocol: {Binary: false},
}
}
// Reader supports returning an arbitrary byte stream over a websocket channel. // Reader supports returning an arbitrary byte stream over a websocket channel.
// Supports the "binary.k8s.io" and "base64.binary.k8s.io" subprotocols.
type Reader struct { type Reader struct {
err chan error err chan error
r io.Reader r io.Reader
ping bool ping bool
timeout time.Duration timeout time.Duration
protocols map[string]ReaderProtocolConfig
selectedProtocol string
handleCrash func() // overridable for testing
} }
// NewReader creates a WebSocket pipe that will copy the contents of r to a provided // NewReader creates a WebSocket pipe that will copy the contents of r to a provided
// WebSocket connection. If ping is true, a zero length message will be sent to the client // WebSocket connection. If ping is true, a zero length message will be sent to the client
// before the stream begins reading. // before the stream begins reading.
func NewReader(r io.Reader, ping bool) *Reader { //
// The protocols parameter maps subprotocol names to StreamProtocols. The empty string
// subprotocol name is used if websocket.Config.Protocol is empty.
func NewReader(r io.Reader, ping bool, protocols map[string]ReaderProtocolConfig) *Reader {
return &Reader{ return &Reader{
r: r, r: r,
err: make(chan error), err: make(chan error),
ping: ping, ping: ping,
protocols: protocols,
handleCrash: func() { runtime.HandleCrash() },
} }
} }
@ -64,14 +88,18 @@ func (r *Reader) SetIdleTimeout(duration time.Duration) {
} }
func (r *Reader) handshake(config *websocket.Config, req *http.Request) error { func (r *Reader) handshake(config *websocket.Config, req *http.Request) error {
return handshake(config, req, []string{binaryWebSocketProtocol, base64BinaryWebSocketProtocol}) supportedProtocols := make([]string, 0, len(r.protocols))
for p := range r.protocols {
supportedProtocols = append(supportedProtocols, p)
}
return handshake(config, req, supportedProtocols)
} }
// Copy the reader to the response. The created WebSocket is closed after this // Copy the reader to the response. The created WebSocket is closed after this
// method completes. // method completes.
func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error { func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error {
go func() { go func() {
defer runtime.HandleCrash() defer r.handleCrash()
websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req) websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req)
}() }()
return <-r.err return <-r.err
@ -79,11 +107,12 @@ func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error {
// handle implements a WebSocket handler. // handle implements a WebSocket handler.
func (r *Reader) handle(ws *websocket.Conn) { func (r *Reader) handle(ws *websocket.Conn) {
encode := len(ws.Config().Protocol) > 0 && ws.Config().Protocol[0] == base64BinaryWebSocketProtocol negotiated := ws.Config().Protocol
r.selectedProtocol = negotiated[0]
defer close(r.err) defer close(r.err)
defer ws.Close() defer ws.Close()
go IgnoreReceives(ws, r.timeout) go IgnoreReceives(ws, r.timeout)
r.err <- messageCopy(ws, r.r, encode, r.ping, r.timeout) r.err <- messageCopy(ws, r.r, !r.protocols[r.selectedProtocol].Binary, r.ping, r.timeout)
} }
func resetTimeout(ws *websocket.Conn, timeout time.Duration) { func resetTimeout(ws *websocket.Conn, timeout time.Duration) {

View File

@ -22,6 +22,7 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http"
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
@ -32,7 +33,7 @@ import (
func TestStream(t *testing.T) { func TestStream(t *testing.T) {
input := "some random text" input := "some random text"
r := NewReader(bytes.NewBuffer([]byte(input)), true) r := NewReader(bytes.NewBuffer([]byte(input)), true, NewDefaultReaderProtocols())
r.SetIdleTimeout(time.Second) r.SetIdleTimeout(time.Second)
data, err := readWebSocket(r, t, nil) data, err := readWebSocket(r, t, nil)
if !reflect.DeepEqual(data, []byte(input)) { if !reflect.DeepEqual(data, []byte(input)) {
@ -45,7 +46,7 @@ func TestStream(t *testing.T) {
func TestStreamPing(t *testing.T) { func TestStreamPing(t *testing.T) {
input := "some random text" input := "some random text"
r := NewReader(bytes.NewBuffer([]byte(input)), true) r := NewReader(bytes.NewBuffer([]byte(input)), true, NewDefaultReaderProtocols())
r.SetIdleTimeout(time.Second) r.SetIdleTimeout(time.Second)
err := expectWebSocketFrames(r, t, nil, [][]byte{ err := expectWebSocketFrames(r, t, nil, [][]byte{
{}, {},
@ -59,8 +60,8 @@ func TestStreamPing(t *testing.T) {
func TestStreamBase64(t *testing.T) { func TestStreamBase64(t *testing.T) {
input := "some random text" input := "some random text"
encoded := base64.StdEncoding.EncodeToString([]byte(input)) encoded := base64.StdEncoding.EncodeToString([]byte(input))
r := NewReader(bytes.NewBuffer([]byte(input)), true) r := NewReader(bytes.NewBuffer([]byte(input)), true, NewDefaultReaderProtocols())
data, err := readWebSocket(r, t, nil, base64BinaryWebSocketProtocol) data, err := readWebSocket(r, t, nil, "base64.binary.k8s.io")
if !reflect.DeepEqual(data, []byte(encoded)) { if !reflect.DeepEqual(data, []byte(encoded)) {
t.Errorf("unexpected server read: %v\n%v", data, []byte(encoded)) t.Errorf("unexpected server read: %v\n%v", data, []byte(encoded))
} }
@ -69,6 +70,73 @@ func TestStreamBase64(t *testing.T) {
} }
} }
func TestStreamVersionedBase64(t *testing.T) {
input := "some random text"
encoded := base64.StdEncoding.EncodeToString([]byte(input))
r := NewReader(bytes.NewBuffer([]byte(input)), true, map[string]ReaderProtocolConfig{
"": {Binary: true},
"binary.k8s.io": {Binary: true},
"base64.binary.k8s.io": {Binary: false},
"v1.binary.k8s.io": {Binary: true},
"v1.base64.binary.k8s.io": {Binary: false},
"v2.binary.k8s.io": {Binary: true},
"v2.base64.binary.k8s.io": {Binary: false},
})
data, err := readWebSocket(r, t, nil, "v2.base64.binary.k8s.io")
if !reflect.DeepEqual(data, []byte(encoded)) {
t.Errorf("unexpected server read: %v\n%v", data, []byte(encoded))
}
if err != nil {
t.Fatal(err)
}
}
func TestStreamVersionedCopy(t *testing.T) {
for i, test := range versionTests() {
func() {
supportedProtocols := map[string]ReaderProtocolConfig{}
for p, binary := range test.supported {
supportedProtocols[p] = ReaderProtocolConfig{
Binary: binary,
}
}
input := "some random text"
r := NewReader(bytes.NewBuffer([]byte(input)), true, supportedProtocols)
s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
err := r.Copy(w, req)
if err != nil {
w.WriteHeader(503)
}
}))
defer s.Close()
config, err := websocket.NewConfig("ws://"+addr, "http://localhost/")
if err != nil {
t.Error(err)
return
}
config.Protocol = test.requested
client, err := websocket.DialConfig(config)
if err != nil {
if !test.error {
t.Errorf("test %d: didn't expect error: %v", i, err)
}
return
}
defer client.Close()
if test.error && err == nil {
t.Errorf("test %d: expected an error", i)
return
}
<-r.err
if got, expected := r.selectedProtocol, test.expected; got != expected {
t.Errorf("test %d: unexpected protocol version: got=%s expected=%s", i, got, expected)
}
}()
}
}
func TestStreamError(t *testing.T) { func TestStreamError(t *testing.T) {
input := "some random text" input := "some random text"
errs := &errorReader{ errs := &errorReader{
@ -78,7 +146,7 @@ func TestStreamError(t *testing.T) {
}, },
err: fmt.Errorf("bad read"), err: fmt.Errorf("bad read"),
} }
r := NewReader(errs, false) r := NewReader(errs, false, NewDefaultReaderProtocols())
data, err := readWebSocket(r, t, nil) data, err := readWebSocket(r, t, nil)
if !reflect.DeepEqual(data, []byte(input)) { if !reflect.DeepEqual(data, []byte(input)) {
@ -98,7 +166,10 @@ func TestStreamSurvivesPanic(t *testing.T) {
}, },
panicMessage: "bad read", panicMessage: "bad read",
} }
r := NewReader(errs, false) r := NewReader(errs, false, NewDefaultReaderProtocols())
// do not call runtime.HandleCrash() in handler. Otherwise, the tests are interrupted.
r.handleCrash = func() { recover() }
data, err := readWebSocket(r, t, nil) data, err := readWebSocket(r, t, nil)
if !reflect.DeepEqual(data, []byte(input)) { if !reflect.DeepEqual(data, []byte(input)) {
@ -121,7 +192,7 @@ func TestStreamClosedDuringRead(t *testing.T) {
err: fmt.Errorf("stuff"), err: fmt.Errorf("stuff"),
pause: ch, pause: ch,
} }
r := NewReader(errs, false) r := NewReader(errs, false, NewDefaultReaderProtocols())
data, err := readWebSocket(r, t, func(c *websocket.Conn) { data, err := readWebSocket(r, t, func(c *websocket.Conn) {
c.Close() c.Close()
@ -163,19 +234,13 @@ func (r *errorReader) Read(p []byte) (int, error) {
func readWebSocket(r *Reader, t *testing.T, fn func(*websocket.Conn), protocols ...string) ([]byte, error) { func readWebSocket(r *Reader, t *testing.T, fn func(*websocket.Conn), protocols ...string) ([]byte, error) {
errCh := make(chan error, 1) errCh := make(chan error, 1)
s, addr := newServer(func(ws *websocket.Conn) { s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
cfg := ws.Config() errCh <- r.Copy(w, req)
cfg.Protocol = protocols }))
go IgnoreReceives(ws, 0)
go func() {
err := <-r.err
errCh <- err
}()
r.handle(ws)
})
defer s.Close() defer s.Close()
config, _ := websocket.NewConfig("ws://"+addr, "http://"+addr) config, _ := websocket.NewConfig("ws://"+addr, "http://"+addr)
config.Protocol = protocols
client, err := websocket.DialConfig(config) client, err := websocket.DialConfig(config)
if err != nil { if err != nil {
return nil, err return nil, err
@ -195,19 +260,13 @@ func readWebSocket(r *Reader, t *testing.T, fn func(*websocket.Conn), protocols
func expectWebSocketFrames(r *Reader, t *testing.T, fn func(*websocket.Conn), frames [][]byte, protocols ...string) error { func expectWebSocketFrames(r *Reader, t *testing.T, fn func(*websocket.Conn), frames [][]byte, protocols ...string) error {
errCh := make(chan error, 1) errCh := make(chan error, 1)
s, addr := newServer(func(ws *websocket.Conn) { s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
cfg := ws.Config() errCh <- r.Copy(w, req)
cfg.Protocol = protocols }))
go IgnoreReceives(ws, 0)
go func() {
err := <-r.err
errCh <- err
}()
r.handle(ws)
})
defer s.Close() defer s.Close()
config, _ := websocket.NewConfig("ws://"+addr, "http://"+addr) config, _ := websocket.NewConfig("ws://"+addr, "http://"+addr)
config.Protocol = protocols
ws, err := websocket.DialConfig(config) ws, err := websocket.DialConfig(config)
if err != nil { if err != nil {
return err return err

View File

@ -36,6 +36,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"syscall"
"time" "time"
"k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
@ -62,6 +63,7 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
sshutil "k8s.io/kubernetes/pkg/ssh" sshutil "k8s.io/kubernetes/pkg/ssh"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
uexec "k8s.io/kubernetes/pkg/util/exec"
labelsutil "k8s.io/kubernetes/pkg/util/labels" labelsutil "k8s.io/kubernetes/pkg/util/labels"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/system" "k8s.io/kubernetes/pkg/util/system"
@ -1996,7 +1998,7 @@ func (b kubectlBuilder) Exec() (string, error) {
Logf("Running '%s %s'", cmd.Path, strings.Join(cmd.Args[1:], " ")) // skip arg[0] as it is printed separately Logf("Running '%s %s'", cmd.Path, strings.Join(cmd.Args[1:], " ")) // skip arg[0] as it is printed separately
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
return "", fmt.Errorf("Error starting %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err) return "", fmt.Errorf("error starting %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err)
} }
errCh := make(chan error, 1) errCh := make(chan error, 1)
go func() { go func() {
@ -2005,11 +2007,19 @@ func (b kubectlBuilder) Exec() (string, error) {
select { select {
case err := <-errCh: case err := <-errCh:
if err != nil { if err != nil {
return "", fmt.Errorf("Error running %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err) var rc int = 127
if ee, ok := err.(*exec.ExitError); ok {
Logf("rc: %d", rc)
rc = int(ee.Sys().(syscall.WaitStatus).ExitStatus())
}
return "", uexec.CodeExitError{
Err: fmt.Errorf("error running %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err),
Code: rc,
}
} }
case <-b.timeout: case <-b.timeout:
b.cmd.Process.Kill() b.cmd.Process.Kill()
return "", fmt.Errorf("Timed out waiting for command %v:\nCommand stdout:\n%v\nstderr:\n%v\n", cmd, cmd.Stdout, cmd.Stderr) return "", fmt.Errorf("timed out waiting for command %v:\nCommand stdout:\n%v\nstderr:\n%v\n", cmd, cmd.Stdout, cmd.Stderr)
} }
Logf("stderr: %q", stderr.String()) Logf("stderr: %q", stderr.String())
return stdout.String(), nil return stdout.String(), nil

View File

@ -51,6 +51,7 @@ import (
"k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic/registry" "k8s.io/kubernetes/pkg/registry/generic/registry"
uexec "k8s.io/kubernetes/pkg/util/exec"
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
@ -348,6 +349,49 @@ var _ = framework.KubeDescribe("Kubectl client", func() {
} }
}) })
It("should return command exit codes", func() {
nsFlag := fmt.Sprintf("--namespace=%v", ns)
By("execing into a container with a successful command")
_, err := framework.NewKubectlCommand(nsFlag, "exec", "nginx", "--", "/bin/sh", "-c", "exit 0").Exec()
ExpectNoError(err)
By("execing into a container with a failing command")
_, err = framework.NewKubectlCommand(nsFlag, "exec", "nginx", "--", "/bin/sh", "-c", "exit 42").Exec()
ee, ok := err.(uexec.ExitError)
Expect(ok).To(Equal(true))
Expect(ee.ExitStatus()).To(Equal(42))
By("running a successful command")
_, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=Never", "success", "--", "/bin/sh", "-c", "exit 0").Exec()
ExpectNoError(err)
By("running a failing command")
_, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=Never", "failure-1", "--", "/bin/sh", "-c", "exit 42").Exec()
ee, ok = err.(uexec.ExitError)
Expect(ok).To(Equal(true))
Expect(ee.ExitStatus()).To(Equal(42))
By("running a failing command without --restart=Never")
_, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=OnFailure", "failure-2", "--", "/bin/sh", "-c", "cat && exit 42").
WithStdinData("abcd1234").
Exec()
ExpectNoError(err)
By("running a failing command without --restart=Never, but with --rm")
_, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=OnFailure", "--rm", "failure-3", "--", "/bin/sh", "-c", "cat && exit 42").
WithStdinData("abcd1234").
Exec()
ExpectNoError(err)
framework.WaitForPodToDisappear(f.Client, ns, "failure-3", labels.Everything(), 2*time.Second, wait.ForeverTestTimeout)
By("running a failing command with --leave-stdin-open")
_, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=Never", "failure-4", "--leave-stdin-open", "--", "/bin/sh", "-c", "exit 42").
WithStdinData("abcd1234").
Exec()
ExpectNoError(err)
})
It("should support inline execution and attach", func() { It("should support inline execution and attach", func() {
framework.SkipIfContainerRuntimeIs("rkt") // #23335 framework.SkipIfContainerRuntimeIs("rkt") // #23335
framework.SkipUnlessServerVersionGTE(jobsVersion, c) framework.SkipUnlessServerVersionGTE(jobsVersion, c)