Merge pull request #123281 from seans3/remote-command-websocket-beta

RemoteCommand over WebSockets to Beta
This commit is contained in:
Kubernetes Prow Robot 2024-02-27 21:01:55 -08:00 committed by GitHub
commit f7ca532472
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 301 additions and 51 deletions

View File

@ -165,14 +165,6 @@ const (
// Enables kubelet to detect CSI volume condition and send the event of the abnormal volume to the corresponding pod that is using it.
CSIVolumeHealth featuregate.Feature = "CSIVolumeHealth"
// owner: @seans3
// kep: http://kep.k8s.io/4006
// alpha: v1.29
//
// Enables StreamTranslator proxy to handle WebSockets upgrade requests for the
// version of the RemoteCommand subprotocol that supports the "close" signal.
TranslateStreamCloseWebsocketRequests featuregate.Feature = "TranslateStreamCloseWebsocketRequests"
// owner: @nckturner
// kep: http://kep.k8s.io/2699
// alpha: v1.27
@ -808,6 +800,14 @@ const (
// Allow the usage of options to fine-tune the topology manager policies.
TopologyManagerPolicyOptions featuregate.Feature = "TopologyManagerPolicyOptions"
// owner: @seans3
// kep: http://kep.k8s.io/4006
// beta: v1.30
//
// Enables StreamTranslator proxy to handle WebSockets upgrade requests for the
// version of the RemoteCommand subprotocol that supports the "close" signal.
TranslateStreamCloseWebsocketRequests featuregate.Feature = "TranslateStreamCloseWebsocketRequests"
// owner: @richabanker
// alpha: v1.28
//
@ -972,8 +972,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
SkipReadOnlyValidationGCE: {Default: true, PreRelease: featuregate.Deprecated}, // remove in 1.31
TranslateStreamCloseWebsocketRequests: {Default: false, PreRelease: featuregate.Alpha},
CloudControllerManagerWebhook: {Default: false, PreRelease: featuregate.Alpha},
ContainerCheckpoint: {Default: false, PreRelease: featuregate.Alpha},
@ -1142,6 +1140,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
TopologyManagerPolicyOptions: {Default: true, PreRelease: featuregate.Beta},
TranslateStreamCloseWebsocketRequests: {Default: true, PreRelease: featuregate.Beta},
UnknownVersionInteroperabilityProxy: {Default: false, PreRelease: featuregate.Alpha},
VolumeAttributesClass: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -0,0 +1,61 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"context"
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
subsystem = "apiserver"
statuscode = "code"
)
var registerMetricsOnce sync.Once
var (
// streamTranslatorRequestsTotal counts the number of requests that were handled by
// the StreamTranslatorProxy.
streamTranslatorRequestsTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: subsystem,
Name: "stream_translator_requests_total",
Help: "Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5",
StabilityLevel: metrics.ALPHA,
},
[]string{statuscode},
)
)
func Register() {
registerMetricsOnce.Do(func() {
legacyregistry.MustRegister(streamTranslatorRequestsTotal)
})
}
func ResetForTest() {
streamTranslatorRequestsTotal.Reset()
}
// IncStreamTranslatorRequest increments the # of requests handled by the StreamTranslatorProxy.
func IncStreamTranslatorRequest(ctx context.Context, status string) {
streamTranslatorRequestsTotal.WithContext(ctx).WithLabelValues(status).Add(1)
}

View File

@ -20,12 +20,14 @@ import (
"fmt"
"net/http"
"net/url"
"strconv"
"github.com/mxk/go-flowrate/flowrate"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
constants "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apiserver/pkg/util/proxy/metrics"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/util/exec"
)
@ -61,6 +63,8 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
// to the client.
websocketStreams, err := webSocketServerStreams(req, w, h.Options)
if err != nil {
// Client error increments bad request status code.
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusBadRequest))
return
}
defer websocketStreams.conn.Close()
@ -69,11 +73,13 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
spdyRoundTripper, err := spdy.NewRoundTripperWithConfig(spdy.RoundTripperConfig{UpgradeTransport: h.Transport})
if err != nil {
websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError))
return
}
spdyExecutor, err := remotecommand.NewSPDYExecutorRejectRedirects(spdyRoundTripper, spdyRoundTripper, "POST", h.Location)
if err != nil {
websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError))
return
}
@ -115,10 +121,16 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
//nolint:errcheck // Ignore writeStatus returned error
if statusErr, ok := err.(*apierrors.StatusError); ok {
websocketStreams.writeStatus(statusErr)
// Increment status code returned within status error.
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(int(statusErr.Status().Code)))
} else if exitErr, ok := err.(exec.CodeExitError); ok && exitErr.Exited() {
websocketStreams.writeStatus(codeExitToStatusError(exitErr))
// Returned an exit code from the container, so not an error in
// stream translator--add StatusOK to metrics.
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK))
} else {
websocketStreams.writeStatus(apierrors.NewInternalError(err))
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError))
}
return
}
@ -128,6 +140,7 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
websocketStreams.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusSuccess,
}})
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK))
}
// translatorSizeQueue feeds the size events from the WebSocket

View File

@ -41,9 +41,12 @@ import (
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
rcconstants "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/proxy/metrics"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
)
// TestStreamTranslator_LoopbackStdinToStdout returns random data sent on the client's
@ -53,6 +56,9 @@ import (
// websocket data into spdy). The returned data read on the websocket client STDOUT is then
// compared the random data sent on STDIN to ensure they are the same.
func TestStreamTranslator_LoopbackStdinToStdout(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
// Create upstream fake SPDY server which copies STDIN back onto STDOUT stream.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, err := createSPDYServerStreams(w, req, Options{
@ -132,6 +138,16 @@ func TestStreamTranslator_LoopbackStdinToStdout(t *testing.T) {
if !bytes.Equal(randomData, data) {
t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData))
}
// Validate the streamtranslator metrics; should be one 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_LoopbackStdinToStderr returns random data sent on the client's
@ -141,6 +157,9 @@ func TestStreamTranslator_LoopbackStdinToStdout(t *testing.T) {
// websocket data into spdy). The returned data read on the websocket client STDERR is then
// compared the random data sent on STDIN to ensure they are the same.
func TestStreamTranslator_LoopbackStdinToStderr(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
// Create upstream fake SPDY server which copies STDIN back onto STDERR stream.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, err := createSPDYServerStreams(w, req, Options{
@ -219,6 +238,16 @@ func TestStreamTranslator_LoopbackStdinToStderr(t *testing.T) {
if !bytes.Equal(randomData, data) {
t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData))
}
// Validate the streamtranslator metrics; should be one 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// Returns a random exit code in the range(1-127).
@ -231,6 +260,9 @@ func randomExitCode() int {
// TestStreamTranslator_ErrorStream tests the error stream by sending an error with a random
// exit code, then validating the error arrives on the error stream.
func TestStreamTranslator_ErrorStream(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
expectedExitCode := randomExitCode()
// Create upstream fake SPDY server, returning a non-zero exit code
// on error stream within the structured error.
@ -321,11 +353,24 @@ func TestStreamTranslator_ErrorStream(t *testing.T) {
t.Errorf("expected error (%s), got (%s)", expectedError, err)
}
}
// Validate the streamtranslator metrics; an exit code error is considered 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_MultipleReadChannels tests two streams (STDOUT, STDERR) reading from
// the connections at the same time.
func TestStreamTranslator_MultipleReadChannels(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
// Create upstream fake SPDY server which copies STDIN back onto STDOUT and STDERR stream.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, err := createSPDYServerStreams(w, req, Options{
@ -417,6 +462,16 @@ func TestStreamTranslator_MultipleReadChannels(t *testing.T) {
if !bytes.Equal(stderrBytes, randomData) {
t.Errorf("unexpected data received: %d sent: %d", len(stderrBytes), len(randomData))
}
// Validate the streamtranslator metrics; should have one 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_ThrottleReadChannels tests two streams (STDOUT, STDERR) using rate limited streams.
@ -556,6 +611,9 @@ func randomTerminalSize() remotecommand.TerminalSize {
// TestStreamTranslator_MultipleWriteChannels
func TestStreamTranslator_TTYResizeChannel(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
// Create the fake terminal size queue and the actualTerminalSizes which
// will be received at the opposite websocket endpoint.
numSizeQueue := 10000
@ -633,11 +691,24 @@ func TestStreamTranslator_TTYResizeChannel(t *testing.T) {
t.Errorf("expected terminal resize window %v, got %v", expected, actual)
}
}
// Validate the streamtranslator metrics; should have one 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_WebSocketServerErrors validates that when there is a problem creating
// the websocket server as the first step of the StreamTranslator an error is properly returned.
func TestStreamTranslator_WebSocketServerErrors(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
spdyLocation, err := url.Parse("http://127.0.0.1")
if err != nil {
t.Fatalf("Unable to parse spdy server URL")
@ -684,11 +755,24 @@ func TestStreamTranslator_WebSocketServerErrors(t *testing.T) {
t.Errorf("expected websocket bad handshake error, got (%s)", err)
}
}
// Validate the streamtranslator metrics; should have one 500 failure.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="400"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_BlockRedirects verifies that the StreamTranslator will *not* follow
// redirects; it will thrown an error instead.
func TestStreamTranslator_BlockRedirects(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
for _, statusCode := range []int{
http.StatusMovedPermanently, // 301
http.StatusFound, // 302
@ -744,6 +828,17 @@ func TestStreamTranslator_BlockRedirects(t *testing.T) {
t.Errorf("expected redirect not allowed error, got (%s)", err)
}
}
// Validate the streamtranslator metrics; should have one 500 failure each loop.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="500"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
metrics.ResetForTest() // Clear metrics each loop
}
}

View File

@ -20,9 +20,9 @@ import (
"context"
)
var _ Executor = &fallbackExecutor{}
var _ Executor = &FallbackExecutor{}
type fallbackExecutor struct {
type FallbackExecutor struct {
primary Executor
secondary Executor
shouldFallback func(error) bool
@ -33,7 +33,7 @@ type fallbackExecutor struct {
// websocket "StreamWithContext" call fails.
// func NewFallbackExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
func NewFallbackExecutor(primary, secondary Executor, shouldFallback func(error) bool) (Executor, error) {
return &fallbackExecutor{
return &FallbackExecutor{
primary: primary,
secondary: secondary,
shouldFallback: shouldFallback,
@ -41,14 +41,14 @@ func NewFallbackExecutor(primary, secondary Executor, shouldFallback func(error)
}
// Stream is deprecated. Please use "StreamWithContext".
func (f *fallbackExecutor) Stream(options StreamOptions) error {
func (f *FallbackExecutor) Stream(options StreamOptions) error {
return f.StreamWithContext(context.Background(), options)
}
// StreamWithContext initially attempts to call "StreamWithContext" using the
// primary executor, falling back to calling the secondary executor if the
// initial primary call to upgrade to a websocket connection fails.
func (f *fallbackExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
func (f *FallbackExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
err := f.primary.StreamWithContext(ctx, options)
if f.shouldFallback(err) {
return f.secondary.StreamWithContext(ctx, options)

View File

@ -193,8 +193,8 @@ func TestFallbackClient_PrimaryAndSecondaryFail(t *testing.T) {
exec, err := NewFallbackExecutor(websocketExecutor, spdyExecutor, func(error) bool { return true })
require.NoError(t, err)
// Update the websocket executor to request remote command v4, which is unsupported.
fallbackExec, ok := exec.(*fallbackExecutor)
assert.True(t, ok, "error casting executor as fallbackExecutor")
fallbackExec, ok := exec.(*FallbackExecutor)
assert.True(t, ok, "error casting executor as FallbackExecutor")
websocketExec, ok := fallbackExec.primary.(*wsStreamExecutor)
assert.True(t, ok, "error casting executor as websocket executor")
// Set the attempted subprotocol version to V4; websocket server only accepts V5.

View File

@ -158,23 +158,10 @@ type DefaultRemoteAttach struct{}
// Attach executes attach to a running container
func (*DefaultRemoteAttach) Attach(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
// Legacy SPDY executor is default. If feature gate enabled, fallback
// executor attempts websockets first--then SPDY.
exec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
exec, err := createExecutor(url, config)
if err != nil {
return err
}
if cmdutil.RemoteCommandWebsockets.IsEnabled() {
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
if err != nil {
return err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
if err != nil {
return err
}
}
return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
@ -184,6 +171,27 @@ func (*DefaultRemoteAttach) Attach(url *url.URL, config *restclient.Config, stdi
})
}
// createExecutor returns the Executor or an error if one occurred.
func createExecutor(url *url.URL, config *restclient.Config) (remotecommand.Executor, error) {
exec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
if err != nil {
return nil, err
}
// Fallback executor is default, unless feature flag is explicitly disabled.
if !cmdutil.RemoteCommandWebsockets.IsDisabled() {
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
if err != nil {
return nil, err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
if err != nil {
return nil, err
}
}
return exec, nil
}
// Complete verifies command line arguments and loads data from the command environment
func (o *AttachOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
var err error

View File

@ -37,6 +37,7 @@ import (
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/cmd/exec"
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/cmd/util/podcmd"
"k8s.io/kubectl/pkg/polymorphichelpers"
"k8s.io/kubectl/pkg/scheme"
@ -553,3 +554,37 @@ func TestReattachMessage(t *testing.T) {
})
}
}
func TestCreateExecutor(t *testing.T) {
url, err := url.Parse("http://localhost:8080/index.html")
if err != nil {
t.Fatalf("unable to parse test url: %v", err)
}
config := cmdtesting.DefaultClientConfig()
// First, ensure that no environment variable creates the fallback executor.
executor, err := createExecutor(url, config)
if err != nil {
t.Fatalf("unable to create executor: %v", err)
}
if _, isFallback := executor.(*remotecommand.FallbackExecutor); !isFallback {
t.Errorf("expected fallback executor, got %#v", executor)
}
// Next, check turning on feature flag explicitly also creates fallback executor.
t.Setenv(string(cmdutil.RemoteCommandWebsockets), "true")
executor, err = createExecutor(url, config)
if err != nil {
t.Fatalf("unable to create executor: %v", err)
}
if _, isFallback := executor.(*remotecommand.FallbackExecutor); !isFallback {
t.Errorf("expected fallback executor, got %#v", executor)
}
// Finally, check explicit disabling does NOT create the fallback executor.
t.Setenv(string(cmdutil.RemoteCommandWebsockets), "false")
executor, err = createExecutor(url, config)
if err != nil {
t.Fatalf("unable to create executor: %v", err)
}
if _, isFallback := executor.(*remotecommand.FallbackExecutor); isFallback {
t.Errorf("expected fallback executor, got %#v", executor)
}
}

View File

@ -121,23 +121,10 @@ type RemoteExecutor interface {
type DefaultRemoteExecutor struct{}
func (*DefaultRemoteExecutor) Execute(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
// Legacy SPDY executor is default. If feature gate enabled, fallback
// executor attempts websockets first--then SPDY.
exec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
exec, err := createExecutor(url, config)
if err != nil {
return err
}
if cmdutil.RemoteCommandWebsockets.IsEnabled() {
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
if err != nil {
return err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
if err != nil {
return err
}
}
return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
@ -147,6 +134,27 @@ func (*DefaultRemoteExecutor) Execute(url *url.URL, config *restclient.Config, s
})
}
// createExecutor returns the Executor or an error if one occurred.
func createExecutor(url *url.URL, config *restclient.Config) (remotecommand.Executor, error) {
exec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
if err != nil {
return nil, err
}
// Fallback executor is default, unless feature flag is explicitly disabled.
if !cmdutil.RemoteCommandWebsockets.IsDisabled() {
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
if err != nil {
return nil, err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
if err != nil {
return nil, err
}
}
return exec, nil
}
type StreamOptions struct {
Namespace string
PodName string

View File

@ -33,8 +33,8 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/rest/fake"
"k8s.io/client-go/tools/remotecommand"
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"
"k8s.io/kubectl/pkg/util/term"
)
@ -402,3 +402,37 @@ func TestSetupTTY(t *testing.T) {
t.Errorf("attach stdin, TTY, is a terminal: tty.Out should equal o.Out")
}
}
func TestCreateExecutor(t *testing.T) {
url, err := url.Parse("http://localhost:8080/index.html")
if err != nil {
t.Fatalf("unable to parse test url: %v", err)
}
config := cmdtesting.DefaultClientConfig()
// First, ensure that no environment variable creates the fallback executor.
executor, err := createExecutor(url, config)
if err != nil {
t.Fatalf("unable to create executor: %v", err)
}
if _, isFallback := executor.(*remotecommand.FallbackExecutor); !isFallback {
t.Errorf("expected fallback executor, got %#v", executor)
}
// Next, check turning on feature flag explicitly also creates fallback executor.
t.Setenv(string(cmdutil.RemoteCommandWebsockets), "true")
executor, err = createExecutor(url, config)
if err != nil {
t.Fatalf("unable to create executor: %v", err)
}
if _, isFallback := executor.(*remotecommand.FallbackExecutor); !isFallback {
t.Errorf("expected fallback executor, got %#v", executor)
}
// Finally, check explicit disabling does NOT create the fallback executor.
t.Setenv(string(cmdutil.RemoteCommandWebsockets), "false")
executor, err = createExecutor(url, config)
if err != nil {
t.Fatalf("unable to create executor: %v", err)
}
if _, isFallback := executor.(*remotecommand.FallbackExecutor); isFallback {
t.Errorf("expected fallback executor, got %#v", executor)
}
}

View File

@ -42,8 +42,6 @@ import (
"sigs.k8s.io/yaml"
utilkubectl "k8s.io/kubectl/pkg/cmd/util"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@ -818,7 +816,6 @@ metadata:
// We wait for a non-empty line so we know kubectl has attached
e2ekubectl.NewKubectlCommand(ns, "run", "run-test", "--image="+busyboxImage, "--restart=OnFailure", podRunningTimeoutArg, "--attach=true", "--stdin", "--", "sh", "-c", "echo -n read: && cat && echo 'stdin closed'").
WithStdinData("value\nabcd1234").
AppendEnv([]string{string(utilkubectl.RemoteCommandWebsockets), "true"}).
ExecOrDie(ns)
runOutput := waitForStdinContent("run-test", "stdin closed")
@ -836,7 +833,6 @@ metadata:
// to the container, this does not solve the race though.
e2ekubectl.NewKubectlCommand(ns, "run", "run-test-2", "--image="+busyboxImage, "--restart=OnFailure", podRunningTimeoutArg, "--attach=true", "--leave-stdin-open=true", "--", "sh", "-c", "cat && echo 'stdin closed'").
WithStdinData("abcd1234").
AppendEnv([]string{string(utilkubectl.RemoteCommandWebsockets), "true"}).
ExecOrDie(ns)
runOutput = waitForStdinContent("run-test-2", "stdin closed")
@ -848,7 +844,6 @@ metadata:
ginkgo.By("executing a command with run and attach with stdin with open stdin should remain running")
e2ekubectl.NewKubectlCommand(ns, "run", "run-test-3", "--image="+busyboxImage, "--restart=OnFailure", podRunningTimeoutArg, "--attach=true", "--leave-stdin-open=true", "--stdin", "--", "sh", "-c", "cat && echo 'stdin closed'").
WithStdinData("abcd1234\n").
AppendEnv([]string{string(utilkubectl.RemoteCommandWebsockets), "true"}).
ExecOrDie(ns)
runOutput = waitForStdinContent("run-test-3", "abcd1234")

1
vendor/modules.txt vendored
View File

@ -1525,6 +1525,7 @@ k8s.io/apiserver/pkg/util/openapi
k8s.io/apiserver/pkg/util/peerproxy
k8s.io/apiserver/pkg/util/peerproxy/metrics
k8s.io/apiserver/pkg/util/proxy
k8s.io/apiserver/pkg/util/proxy/metrics
k8s.io/apiserver/pkg/util/shufflesharding
k8s.io/apiserver/pkg/util/webhook
k8s.io/apiserver/pkg/util/x509metrics