diff --git a/pkg/client/unversioned/remotecommand/BUILD b/pkg/client/unversioned/remotecommand/BUILD index c9b024ce83b..87f85d63f9e 100644 --- a/pkg/client/unversioned/remotecommand/BUILD +++ b/pkg/client/unversioned/remotecommand/BUILD @@ -54,5 +54,6 @@ go_test( "//pkg/util/httpstream:go_default_library", "//pkg/util/term:go_default_library", "//pkg/util/wait:go_default_library", + "//vendor:github.com/stretchr/testify/require", ], ) diff --git a/pkg/client/unversioned/remotecommand/remotecommand_test.go b/pkg/client/unversioned/remotecommand/remotecommand_test.go index 638493a6b1a..16bb1357176 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand_test.go +++ b/pkg/client/unversioned/remotecommand/remotecommand_test.go @@ -29,6 +29,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/client/restclient" @@ -118,10 +120,13 @@ func fakeServer(t *testing.T, testName string, exec bool, stdinData, stdoutData, exec: exec, } + opts, err := remotecommand.NewOptions(req) + require.NoError(t, err) if exec { - remotecommand.ServeExec(w, req, executor, "pod", "uid", "container", 0, 10*time.Second, serverProtocols) + cmd := req.URL.Query()[api.ExecCommandParamm] + remotecommand.ServeExec(w, req, executor, "pod", "uid", "container", cmd, opts, 0, 10*time.Second, serverProtocols) } else { - remotecommand.ServeAttach(w, req, executor, "pod", "uid", "container", 0, 10*time.Second, serverProtocols) + remotecommand.ServeAttach(w, req, executor, "pod", "uid", "container", opts, 0, 10*time.Second, serverProtocols) } if e, a := strings.Repeat(stdinData, messageCount), executor.stdinReceived.String(); e != a { diff --git a/pkg/kubelet/server/BUILD b/pkg/kubelet/server/BUILD index 93b6a11e09a..4511eeca905 100644 --- a/pkg/kubelet/server/BUILD +++ b/pkg/kubelet/server/BUILD @@ -39,6 +39,7 @@ go_library( "//pkg/util/configz:go_default_library", "//pkg/util/flushwriter:go_default_library", "//pkg/util/limitwriter:go_default_library", + "//pkg/util/runtime:go_default_library", "//pkg/util/term:go_default_library", "//pkg/volume:go_default_library", "//vendor:github.com/emicklei/go-restful", diff --git a/pkg/kubelet/server/remotecommand/attach.go b/pkg/kubelet/server/remotecommand/attach.go index 9cef7abc962..230de3ef9db 100644 --- a/pkg/kubelet/server/remotecommand/attach.go +++ b/pkg/kubelet/server/remotecommand/attach.go @@ -38,8 +38,8 @@ type Attacher interface { // ServeAttach handles requests to attach to a container. After creating/receiving the required // streams, it delegates the actual attaching to attacher. -func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { - ctx, ok := createStreams(req, w, supportedProtocols, idleTimeout, streamCreationTimeout) +func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { + ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) if !ok { // error is handled by createStreams return diff --git a/pkg/kubelet/server/remotecommand/exec.go b/pkg/kubelet/server/remotecommand/exec.go index d3319b1251e..9e669778673 100644 --- a/pkg/kubelet/server/remotecommand/exec.go +++ b/pkg/kubelet/server/remotecommand/exec.go @@ -22,7 +22,6 @@ import ( "net/http" "time" - "k8s.io/kubernetes/pkg/api" apierrors "k8s.io/kubernetes/pkg/api/errors" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/types" @@ -46,16 +45,14 @@ type Executor interface { // ServeExec handles requests to execute a command in a container. After // creating/receiving the required streams, it delegates the actual execution // to the executor. -func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { - ctx, ok := createStreams(req, w, supportedProtocols, idleTimeout, streamCreationTimeout) +func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { + ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) if !ok { // error is handled by createStreams return } defer ctx.conn.Close() - cmd := req.URL.Query()[api.ExecCommandParamm] - err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0) if err != nil { if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() { diff --git a/pkg/kubelet/server/remotecommand/httpstream.go b/pkg/kubelet/server/remotecommand/httpstream.go index 4816e214462..b306ce5bfb8 100644 --- a/pkg/kubelet/server/remotecommand/httpstream.go +++ b/pkg/kubelet/server/remotecommand/httpstream.go @@ -39,11 +39,10 @@ import ( // Options contains details about which streams are required for // remote command execution. type Options struct { - Stdin bool - Stdout bool - Stderr bool - TTY bool - expectedStreams int + Stdin bool + Stdout bool + Stderr bool + TTY bool } // NewOptions creates a new Options from the Request. @@ -58,28 +57,15 @@ func NewOptions(req *http.Request) (*Options, error) { stderr = false } - // count the streams client asked for, starting with 1 - expectedStreams := 1 - if stdin { - expectedStreams++ - } - if stdout { - expectedStreams++ - } - if stderr { - expectedStreams++ - } - - if expectedStreams == 1 { + if !stdin && !stdout && !stderr { return nil, fmt.Errorf("you must specify at least 1 of stdin, stdout, stderr") } return &Options{ - Stdin: stdin, - Stdout: stdout, - Stderr: stderr, - TTY: tty, - expectedStreams: expectedStreams, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + TTY: tty, }, nil } @@ -115,15 +101,7 @@ func waitStreamReply(replySent <-chan struct{}, notify chan<- struct{}, stop <-c } } -func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) { - opts, err := NewOptions(req) - if err != nil { - runtime.HandleError(err) - w.WriteHeader(http.StatusBadRequest) - fmt.Fprint(w, err.Error()) - return nil, false - } - +func createStreams(req *http.Request, w http.ResponseWriter, opts *Options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) { var ctx *context var ok bool if wsstream.IsWebSocketRequest(req) { @@ -183,14 +161,25 @@ func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *Opt handler = &v1ProtocolHandler{} } + // count the streams client asked for, starting with 1 + expectedStreams := 1 + if opts.Stdin { + expectedStreams++ + } + if opts.Stdout { + expectedStreams++ + } + if opts.Stderr { + expectedStreams++ + } if opts.TTY && handler.supportsTerminalResizing() { - opts.expectedStreams++ + expectedStreams++ } expired := time.NewTimer(streamCreationTimeout) defer expired.Stop() - ctx, err := handler.waitForStreams(streamCh, opts.expectedStreams, expired.C) + ctx, err := handler.waitForStreams(streamCh, expectedStreams, expired.C) if err != nil { runtime.HandleError(err) return nil, false diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 6335d73c583..bfc8dd43064 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -56,6 +56,7 @@ import ( "k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/flushwriter" "k8s.io/kubernetes/pkg/util/limitwriter" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/volume" ) @@ -575,30 +576,27 @@ type requestParams struct { podUID types.UID containerName string cmd []string - streamOpts remotecommand.Options } func getRequestParams(req *restful.Request) requestParams { - streamOpts, err := remotecommand.NewOptions(req.Request) - if err != nil { - glog.Warningf("Unable to parse request stream options: %v", err) - } - if streamOpts == nil { - streamOpts = &remotecommand.Options{} - } return requestParams{ podNamespace: req.PathParameter("podNamespace"), podName: req.PathParameter("podID"), podUID: types.UID(req.PathParameter("uid")), containerName: req.PathParameter("containerName"), cmd: req.Request.URL.Query()[api.ExecCommandParamm], - streamOpts: *streamOpts, } } // getAttach handles requests to attach to a container. func (s *Server) getAttach(request *restful.Request, response *restful.Response) { params := getRequestParams(request) + streamOpts, err := remotecommand.NewOptions(request.Request) + if err != nil { + utilruntime.HandleError(err) + response.WriteError(http.StatusBadRequest, err) + return + } pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) if !ok { response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) @@ -606,7 +604,7 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response) } podFullName := kubecontainer.GetPodFullName(pod) - redirect, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, params.streamOpts) + redirect, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, *streamOpts) if err != nil { response.WriteError(streaming.HTTPStatus(err), err) return @@ -622,6 +620,7 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response) podFullName, params.podUID, params.containerName, + streamOpts, s.host.StreamingConnectionIdleTimeout(), remotecommand.DefaultStreamCreationTimeout, remotecommand.SupportedStreamingProtocols) @@ -630,6 +629,12 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response) // getExec handles requests to run a command inside a container. func (s *Server) getExec(request *restful.Request, response *restful.Response) { params := getRequestParams(request) + streamOpts, err := remotecommand.NewOptions(request.Request) + if err != nil { + utilruntime.HandleError(err) + response.WriteError(http.StatusBadRequest, err) + return + } pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) if !ok { response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) @@ -637,7 +642,7 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) { } podFullName := kubecontainer.GetPodFullName(pod) - redirect, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, params.streamOpts) + redirect, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts) if err != nil { response.WriteError(streaming.HTTPStatus(err), err) return @@ -653,6 +658,8 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) { podFullName, params.podUID, params.containerName, + params.cmd, + streamOpts, s.host.StreamingConnectionIdleTimeout(), remotecommand.DefaultStreamCreationTimeout, remotecommand.SupportedStreamingProtocols) diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 99bf3e335d4..3415cb7a751 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -1172,7 +1172,7 @@ func testExecAttach(t *testing.T, verb string) { {stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols}, {stdout: true, stderr: true, tty: true, responseStatusCode: http.StatusSwitchingProtocols}, {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols}, - {responseStatusCode: http.StatusFound, responseLocation: "http://localhost:12345/" + verb}, + {stdout: true, responseStatusCode: http.StatusFound, responseLocation: "http://localhost:12345/" + verb}, } for i, test := range tests { diff --git a/pkg/kubelet/server/streaming/server.go b/pkg/kubelet/server/streaming/server.go index 88c0bd9698b..f1d2d0fa902 100644 --- a/pkg/kubelet/server/streaming/server.go +++ b/pkg/kubelet/server/streaming/server.go @@ -251,6 +251,13 @@ func (s *server) serveExec(req *restful.Request, resp *restful.Response) { return } + streamOpts, err := remotecommand.NewOptions(req.Request) + if err != nil { + resp.WriteError(http.StatusBadRequest, err) + return + } + cmd := req.Request.URL.Query()[api.ExecCommandParamm] + remotecommand.ServeExec( resp.ResponseWriter, req.Request, @@ -258,6 +265,8 @@ func (s *server) serveExec(req *restful.Request, resp *restful.Response) { "", // unused: podName "", // unusued: podUID containerID, + cmd, + streamOpts, s.config.StreamIdleTimeout, s.config.StreamCreationTimeout, s.config.SupportedProtocols) @@ -270,6 +279,12 @@ func (s *server) serveAttach(req *restful.Request, resp *restful.Response) { return } + streamOpts, err := remotecommand.NewOptions(req.Request) + if err != nil { + resp.WriteError(http.StatusBadRequest, err) + return + } + remotecommand.ServeAttach( resp.ResponseWriter, req.Request, @@ -277,6 +292,7 @@ func (s *server) serveAttach(req *restful.Request, resp *restful.Response) { "", // unused: podName "", // unusued: podUID containerID, + streamOpts, s.config.StreamIdleTimeout, s.config.StreamCreationTimeout, s.config.SupportedProtocols)