mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 05:57:25 +00:00
Merge pull request #38435 from timstclair/remotecmd-refactor
Automatic merge from submit-queue Refactor remotecommand options parsing Prerequisite to https://github.com/kubernetes/kubernetes/issues/36187 - This separates the options from the request, so they can be pulled from elsewhere. /cc @liggitt
This commit is contained in:
commit
c40404ae9a
@ -54,5 +54,6 @@ go_test(
|
|||||||
"//pkg/util/httpstream:go_default_library",
|
"//pkg/util/httpstream:go_default_library",
|
||||||
"//pkg/util/term:go_default_library",
|
"//pkg/util/term:go_default_library",
|
||||||
"//pkg/util/wait:go_default_library",
|
"//pkg/util/wait:go_default_library",
|
||||||
|
"//vendor:github.com/stretchr/testify/require",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -29,6 +29,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/testapi"
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
"k8s.io/kubernetes/pkg/client/restclient"
|
"k8s.io/kubernetes/pkg/client/restclient"
|
||||||
@ -118,10 +120,13 @@ func fakeServer(t *testing.T, testName string, exec bool, stdinData, stdoutData,
|
|||||||
exec: exec,
|
exec: exec,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
opts, err := remotecommand.NewOptions(req)
|
||||||
|
require.NoError(t, err)
|
||||||
if exec {
|
if exec {
|
||||||
remotecommand.ServeExec(w, req, executor, "pod", "uid", "container", 0, 10*time.Second, serverProtocols)
|
cmd := req.URL.Query()[api.ExecCommandParamm]
|
||||||
|
remotecommand.ServeExec(w, req, executor, "pod", "uid", "container", cmd, opts, 0, 10*time.Second, serverProtocols)
|
||||||
} else {
|
} else {
|
||||||
remotecommand.ServeAttach(w, req, executor, "pod", "uid", "container", 0, 10*time.Second, serverProtocols)
|
remotecommand.ServeAttach(w, req, executor, "pod", "uid", "container", opts, 0, 10*time.Second, serverProtocols)
|
||||||
}
|
}
|
||||||
|
|
||||||
if e, a := strings.Repeat(stdinData, messageCount), executor.stdinReceived.String(); e != a {
|
if e, a := strings.Repeat(stdinData, messageCount), executor.stdinReceived.String(); e != a {
|
||||||
|
@ -39,6 +39,7 @@ go_library(
|
|||||||
"//pkg/util/configz:go_default_library",
|
"//pkg/util/configz:go_default_library",
|
||||||
"//pkg/util/flushwriter:go_default_library",
|
"//pkg/util/flushwriter:go_default_library",
|
||||||
"//pkg/util/limitwriter:go_default_library",
|
"//pkg/util/limitwriter:go_default_library",
|
||||||
|
"//pkg/util/runtime:go_default_library",
|
||||||
"//pkg/util/term:go_default_library",
|
"//pkg/util/term:go_default_library",
|
||||||
"//pkg/volume:go_default_library",
|
"//pkg/volume:go_default_library",
|
||||||
"//vendor:github.com/emicklei/go-restful",
|
"//vendor:github.com/emicklei/go-restful",
|
||||||
|
@ -38,8 +38,8 @@ type Attacher interface {
|
|||||||
|
|
||||||
// ServeAttach handles requests to attach to a container. After creating/receiving the required
|
// ServeAttach handles requests to attach to a container. After creating/receiving the required
|
||||||
// streams, it delegates the actual attaching to attacher.
|
// streams, it delegates the actual attaching to attacher.
|
||||||
func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
|
func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
|
||||||
ctx, ok := createStreams(req, w, supportedProtocols, idleTimeout, streamCreationTimeout)
|
ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
|
||||||
if !ok {
|
if !ok {
|
||||||
// error is handled by createStreams
|
// error is handled by createStreams
|
||||||
return
|
return
|
||||||
|
@ -22,7 +22,6 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
|
||||||
apierrors "k8s.io/kubernetes/pkg/api/errors"
|
apierrors "k8s.io/kubernetes/pkg/api/errors"
|
||||||
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
@ -46,16 +45,14 @@ type Executor interface {
|
|||||||
// ServeExec handles requests to execute a command in a container. After
|
// ServeExec handles requests to execute a command in a container. After
|
||||||
// creating/receiving the required streams, it delegates the actual execution
|
// creating/receiving the required streams, it delegates the actual execution
|
||||||
// to the executor.
|
// to the executor.
|
||||||
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
|
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
|
||||||
ctx, ok := createStreams(req, w, supportedProtocols, idleTimeout, streamCreationTimeout)
|
ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
|
||||||
if !ok {
|
if !ok {
|
||||||
// error is handled by createStreams
|
// error is handled by createStreams
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer ctx.conn.Close()
|
defer ctx.conn.Close()
|
||||||
|
|
||||||
cmd := req.URL.Query()[api.ExecCommandParamm]
|
|
||||||
|
|
||||||
err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)
|
err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() {
|
if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() {
|
||||||
|
@ -43,7 +43,6 @@ type Options struct {
|
|||||||
Stdout bool
|
Stdout bool
|
||||||
Stderr bool
|
Stderr bool
|
||||||
TTY bool
|
TTY bool
|
||||||
expectedStreams int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOptions creates a new Options from the Request.
|
// NewOptions creates a new Options from the Request.
|
||||||
@ -58,19 +57,7 @@ func NewOptions(req *http.Request) (*Options, error) {
|
|||||||
stderr = false
|
stderr = false
|
||||||
}
|
}
|
||||||
|
|
||||||
// count the streams client asked for, starting with 1
|
if !stdin && !stdout && !stderr {
|
||||||
expectedStreams := 1
|
|
||||||
if stdin {
|
|
||||||
expectedStreams++
|
|
||||||
}
|
|
||||||
if stdout {
|
|
||||||
expectedStreams++
|
|
||||||
}
|
|
||||||
if stderr {
|
|
||||||
expectedStreams++
|
|
||||||
}
|
|
||||||
|
|
||||||
if expectedStreams == 1 {
|
|
||||||
return nil, fmt.Errorf("you must specify at least 1 of stdin, stdout, stderr")
|
return nil, fmt.Errorf("you must specify at least 1 of stdin, stdout, stderr")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,7 +66,6 @@ func NewOptions(req *http.Request) (*Options, error) {
|
|||||||
Stdout: stdout,
|
Stdout: stdout,
|
||||||
Stderr: stderr,
|
Stderr: stderr,
|
||||||
TTY: tty,
|
TTY: tty,
|
||||||
expectedStreams: expectedStreams,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -115,15 +101,7 @@ func waitStreamReply(replySent <-chan struct{}, notify chan<- struct{}, stop <-c
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
|
func createStreams(req *http.Request, w http.ResponseWriter, opts *Options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
|
||||||
opts, err := NewOptions(req)
|
|
||||||
if err != nil {
|
|
||||||
runtime.HandleError(err)
|
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
|
||||||
fmt.Fprint(w, err.Error())
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
|
|
||||||
var ctx *context
|
var ctx *context
|
||||||
var ok bool
|
var ok bool
|
||||||
if wsstream.IsWebSocketRequest(req) {
|
if wsstream.IsWebSocketRequest(req) {
|
||||||
@ -183,14 +161,25 @@ func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *Opt
|
|||||||
handler = &v1ProtocolHandler{}
|
handler = &v1ProtocolHandler{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// count the streams client asked for, starting with 1
|
||||||
|
expectedStreams := 1
|
||||||
|
if opts.Stdin {
|
||||||
|
expectedStreams++
|
||||||
|
}
|
||||||
|
if opts.Stdout {
|
||||||
|
expectedStreams++
|
||||||
|
}
|
||||||
|
if opts.Stderr {
|
||||||
|
expectedStreams++
|
||||||
|
}
|
||||||
if opts.TTY && handler.supportsTerminalResizing() {
|
if opts.TTY && handler.supportsTerminalResizing() {
|
||||||
opts.expectedStreams++
|
expectedStreams++
|
||||||
}
|
}
|
||||||
|
|
||||||
expired := time.NewTimer(streamCreationTimeout)
|
expired := time.NewTimer(streamCreationTimeout)
|
||||||
defer expired.Stop()
|
defer expired.Stop()
|
||||||
|
|
||||||
ctx, err := handler.waitForStreams(streamCh, opts.expectedStreams, expired.C)
|
ctx, err := handler.waitForStreams(streamCh, expectedStreams, expired.C)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
runtime.HandleError(err)
|
runtime.HandleError(err)
|
||||||
return nil, false
|
return nil, false
|
||||||
|
@ -56,6 +56,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/util/configz"
|
"k8s.io/kubernetes/pkg/util/configz"
|
||||||
"k8s.io/kubernetes/pkg/util/flushwriter"
|
"k8s.io/kubernetes/pkg/util/flushwriter"
|
||||||
"k8s.io/kubernetes/pkg/util/limitwriter"
|
"k8s.io/kubernetes/pkg/util/limitwriter"
|
||||||
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/term"
|
"k8s.io/kubernetes/pkg/util/term"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
)
|
)
|
||||||
@ -575,30 +576,27 @@ type requestParams struct {
|
|||||||
podUID types.UID
|
podUID types.UID
|
||||||
containerName string
|
containerName string
|
||||||
cmd []string
|
cmd []string
|
||||||
streamOpts remotecommand.Options
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRequestParams(req *restful.Request) requestParams {
|
func getRequestParams(req *restful.Request) requestParams {
|
||||||
streamOpts, err := remotecommand.NewOptions(req.Request)
|
|
||||||
if err != nil {
|
|
||||||
glog.Warningf("Unable to parse request stream options: %v", err)
|
|
||||||
}
|
|
||||||
if streamOpts == nil {
|
|
||||||
streamOpts = &remotecommand.Options{}
|
|
||||||
}
|
|
||||||
return requestParams{
|
return requestParams{
|
||||||
podNamespace: req.PathParameter("podNamespace"),
|
podNamespace: req.PathParameter("podNamespace"),
|
||||||
podName: req.PathParameter("podID"),
|
podName: req.PathParameter("podID"),
|
||||||
podUID: types.UID(req.PathParameter("uid")),
|
podUID: types.UID(req.PathParameter("uid")),
|
||||||
containerName: req.PathParameter("containerName"),
|
containerName: req.PathParameter("containerName"),
|
||||||
cmd: req.Request.URL.Query()[api.ExecCommandParamm],
|
cmd: req.Request.URL.Query()[api.ExecCommandParamm],
|
||||||
streamOpts: *streamOpts,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getAttach handles requests to attach to a container.
|
// getAttach handles requests to attach to a container.
|
||||||
func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
|
func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
|
||||||
params := getRequestParams(request)
|
params := getRequestParams(request)
|
||||||
|
streamOpts, err := remotecommand.NewOptions(request.Request)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(err)
|
||||||
|
response.WriteError(http.StatusBadRequest, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
|
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
|
||||||
if !ok {
|
if !ok {
|
||||||
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
|
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
|
||||||
@ -606,7 +604,7 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response)
|
|||||||
}
|
}
|
||||||
|
|
||||||
podFullName := kubecontainer.GetPodFullName(pod)
|
podFullName := kubecontainer.GetPodFullName(pod)
|
||||||
redirect, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, params.streamOpts)
|
redirect, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, *streamOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.WriteError(streaming.HTTPStatus(err), err)
|
response.WriteError(streaming.HTTPStatus(err), err)
|
||||||
return
|
return
|
||||||
@ -622,6 +620,7 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response)
|
|||||||
podFullName,
|
podFullName,
|
||||||
params.podUID,
|
params.podUID,
|
||||||
params.containerName,
|
params.containerName,
|
||||||
|
streamOpts,
|
||||||
s.host.StreamingConnectionIdleTimeout(),
|
s.host.StreamingConnectionIdleTimeout(),
|
||||||
remotecommand.DefaultStreamCreationTimeout,
|
remotecommand.DefaultStreamCreationTimeout,
|
||||||
remotecommand.SupportedStreamingProtocols)
|
remotecommand.SupportedStreamingProtocols)
|
||||||
@ -630,6 +629,12 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response)
|
|||||||
// getExec handles requests to run a command inside a container.
|
// getExec handles requests to run a command inside a container.
|
||||||
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
|
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
|
||||||
params := getRequestParams(request)
|
params := getRequestParams(request)
|
||||||
|
streamOpts, err := remotecommand.NewOptions(request.Request)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(err)
|
||||||
|
response.WriteError(http.StatusBadRequest, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
|
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
|
||||||
if !ok {
|
if !ok {
|
||||||
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
|
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
|
||||||
@ -637,7 +642,7 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
podFullName := kubecontainer.GetPodFullName(pod)
|
podFullName := kubecontainer.GetPodFullName(pod)
|
||||||
redirect, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, params.streamOpts)
|
redirect, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
response.WriteError(streaming.HTTPStatus(err), err)
|
response.WriteError(streaming.HTTPStatus(err), err)
|
||||||
return
|
return
|
||||||
@ -653,6 +658,8 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) {
|
|||||||
podFullName,
|
podFullName,
|
||||||
params.podUID,
|
params.podUID,
|
||||||
params.containerName,
|
params.containerName,
|
||||||
|
params.cmd,
|
||||||
|
streamOpts,
|
||||||
s.host.StreamingConnectionIdleTimeout(),
|
s.host.StreamingConnectionIdleTimeout(),
|
||||||
remotecommand.DefaultStreamCreationTimeout,
|
remotecommand.DefaultStreamCreationTimeout,
|
||||||
remotecommand.SupportedStreamingProtocols)
|
remotecommand.SupportedStreamingProtocols)
|
||||||
|
@ -1172,7 +1172,7 @@ func testExecAttach(t *testing.T, verb string) {
|
|||||||
{stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
|
{stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
|
||||||
{stdout: true, stderr: true, tty: true, responseStatusCode: http.StatusSwitchingProtocols},
|
{stdout: true, stderr: true, tty: true, responseStatusCode: http.StatusSwitchingProtocols},
|
||||||
{stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
|
{stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
|
||||||
{responseStatusCode: http.StatusFound, responseLocation: "http://localhost:12345/" + verb},
|
{stdout: true, responseStatusCode: http.StatusFound, responseLocation: "http://localhost:12345/" + verb},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, test := range tests {
|
for i, test := range tests {
|
||||||
|
@ -251,6 +251,13 @@ func (s *server) serveExec(req *restful.Request, resp *restful.Response) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamOpts, err := remotecommand.NewOptions(req.Request)
|
||||||
|
if err != nil {
|
||||||
|
resp.WriteError(http.StatusBadRequest, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cmd := req.Request.URL.Query()[api.ExecCommandParamm]
|
||||||
|
|
||||||
remotecommand.ServeExec(
|
remotecommand.ServeExec(
|
||||||
resp.ResponseWriter,
|
resp.ResponseWriter,
|
||||||
req.Request,
|
req.Request,
|
||||||
@ -258,6 +265,8 @@ func (s *server) serveExec(req *restful.Request, resp *restful.Response) {
|
|||||||
"", // unused: podName
|
"", // unused: podName
|
||||||
"", // unusued: podUID
|
"", // unusued: podUID
|
||||||
containerID,
|
containerID,
|
||||||
|
cmd,
|
||||||
|
streamOpts,
|
||||||
s.config.StreamIdleTimeout,
|
s.config.StreamIdleTimeout,
|
||||||
s.config.StreamCreationTimeout,
|
s.config.StreamCreationTimeout,
|
||||||
s.config.SupportedProtocols)
|
s.config.SupportedProtocols)
|
||||||
@ -270,6 +279,12 @@ func (s *server) serveAttach(req *restful.Request, resp *restful.Response) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamOpts, err := remotecommand.NewOptions(req.Request)
|
||||||
|
if err != nil {
|
||||||
|
resp.WriteError(http.StatusBadRequest, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
remotecommand.ServeAttach(
|
remotecommand.ServeAttach(
|
||||||
resp.ResponseWriter,
|
resp.ResponseWriter,
|
||||||
req.Request,
|
req.Request,
|
||||||
@ -277,6 +292,7 @@ func (s *server) serveAttach(req *restful.Request, resp *restful.Response) {
|
|||||||
"", // unused: podName
|
"", // unused: podName
|
||||||
"", // unusued: podUID
|
"", // unusued: podUID
|
||||||
containerID,
|
containerID,
|
||||||
|
streamOpts,
|
||||||
s.config.StreamIdleTimeout,
|
s.config.StreamIdleTimeout,
|
||||||
s.config.StreamCreationTimeout,
|
s.config.StreamCreationTimeout,
|
||||||
s.config.SupportedProtocols)
|
s.config.SupportedProtocols)
|
||||||
|
Loading…
Reference in New Issue
Block a user