remote command turn on feature gates

This commit is contained in:
Sean Sullivan 2024-02-13 14:10:40 -08:00 committed by Sean Sullivan
parent a882a2bf50
commit a147693deb
8 changed files with 122 additions and 42 deletions

View File

@ -971,7 +971,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
SkipReadOnlyValidationGCE: {Default: true, PreRelease: featuregate.Deprecated}, // remove in 1.31
TranslateStreamCloseWebsocketRequests: {Default: false, PreRelease: featuregate.Alpha},
TranslateStreamCloseWebsocketRequests: {Default: true, PreRelease: featuregate.Beta},
CloudControllerManagerWebhook: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -20,9 +20,9 @@ import (
"context"
)
var _ Executor = &fallbackExecutor{}
var _ Executor = &FallbackExecutor{}
type fallbackExecutor struct {
type FallbackExecutor struct {
primary Executor
secondary Executor
shouldFallback func(error) bool
@ -33,7 +33,7 @@ type fallbackExecutor struct {
// websocket "StreamWithContext" call fails.
// func NewFallbackExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
func NewFallbackExecutor(primary, secondary Executor, shouldFallback func(error) bool) (Executor, error) {
return &fallbackExecutor{
return &FallbackExecutor{
primary: primary,
secondary: secondary,
shouldFallback: shouldFallback,
@ -41,14 +41,14 @@ func NewFallbackExecutor(primary, secondary Executor, shouldFallback func(error)
}
// Stream is deprecated. Please use "StreamWithContext".
func (f *fallbackExecutor) Stream(options StreamOptions) error {
func (f *FallbackExecutor) Stream(options StreamOptions) error {
return f.StreamWithContext(context.Background(), options)
}
// StreamWithContext initially attempts to call "StreamWithContext" using the
// primary executor, falling back to calling the secondary executor if the
// initial primary call to upgrade to a websocket connection fails.
func (f *fallbackExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
func (f *FallbackExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
err := f.primary.StreamWithContext(ctx, options)
if f.shouldFallback(err) {
return f.secondary.StreamWithContext(ctx, options)

View File

@ -193,8 +193,8 @@ func TestFallbackClient_PrimaryAndSecondaryFail(t *testing.T) {
exec, err := NewFallbackExecutor(websocketExecutor, spdyExecutor, func(error) bool { return true })
require.NoError(t, err)
// Update the websocket executor to request remote command v4, which is unsupported.
fallbackExec, ok := exec.(*fallbackExecutor)
assert.True(t, ok, "error casting executor as fallbackExecutor")
fallbackExec, ok := exec.(*FallbackExecutor)
assert.True(t, ok, "error casting executor as FallbackExecutor")
websocketExec, ok := fallbackExec.primary.(*wsStreamExecutor)
assert.True(t, ok, "error casting executor as websocket executor")
// Set the attempted subprotocol version to V4; websocket server only accepts V5.

View File

@ -158,23 +158,10 @@ type DefaultRemoteAttach struct{}
// Attach executes attach to a running container
func (*DefaultRemoteAttach) Attach(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
// Legacy SPDY executor is default. If feature gate enabled, fallback
// executor attempts websockets first--then SPDY.
exec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
exec, err := createExecutor(url, config)
if err != nil {
return err
}
if cmdutil.RemoteCommandWebsockets.IsEnabled() {
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
if err != nil {
return err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
if err != nil {
return err
}
}
return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
@ -184,6 +171,27 @@ func (*DefaultRemoteAttach) Attach(url *url.URL, config *restclient.Config, stdi
})
}
// createExecutor returns the Executor or an error if one occurred.
func createExecutor(url *url.URL, config *restclient.Config) (remotecommand.Executor, error) {
exec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
if err != nil {
return nil, err
}
// Fallback executor is default, unless feature flag is explicitly disabled.
if !cmdutil.RemoteCommandWebsockets.IsDisabled() {
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
if err != nil {
return nil, err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
if err != nil {
return nil, err
}
}
return exec, nil
}
// Complete verifies command line arguments and loads data from the command environment
func (o *AttachOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
var err error

View File

@ -37,6 +37,7 @@ import (
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/cmd/exec"
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/cmd/util/podcmd"
"k8s.io/kubectl/pkg/polymorphichelpers"
"k8s.io/kubectl/pkg/scheme"
@ -553,3 +554,37 @@ func TestReattachMessage(t *testing.T) {
})
}
}
func TestCreateExecutor(t *testing.T) {
url, err := url.Parse("http://localhost:8080/index.html")
if err != nil {
t.Fatalf("unable to parse test url: %v", err)
}
config := cmdtesting.DefaultClientConfig()
// First, ensure that no environment variable creates the fallback executor.
executor, err := createExecutor(url, config)
if err != nil {
t.Fatalf("unable to create executor: %v", err)
}
if _, isFallback := executor.(*remotecommand.FallbackExecutor); !isFallback {
t.Errorf("expected fallback executor, got %#v", executor)
}
// Next, check turning on feature flag explicitly also creates fallback executor.
t.Setenv(string(cmdutil.RemoteCommandWebsockets), "true")
executor, err = createExecutor(url, config)
if err != nil {
t.Fatalf("unable to create executor: %v", err)
}
if _, isFallback := executor.(*remotecommand.FallbackExecutor); !isFallback {
t.Errorf("expected fallback executor, got %#v", executor)
}
// Finally, check explicit disabling does NOT create the fallback executor.
t.Setenv(string(cmdutil.RemoteCommandWebsockets), "false")
executor, err = createExecutor(url, config)
if err != nil {
t.Fatalf("unable to create executor: %v", err)
}
if _, isFallback := executor.(*remotecommand.FallbackExecutor); isFallback {
t.Errorf("expected fallback executor, got %#v", executor)
}
}

View File

@ -121,23 +121,10 @@ type RemoteExecutor interface {
type DefaultRemoteExecutor struct{}
func (*DefaultRemoteExecutor) Execute(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
// Legacy SPDY executor is default. If feature gate enabled, fallback
// executor attempts websockets first--then SPDY.
exec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
exec, err := createExecutor(url, config)
if err != nil {
return err
}
if cmdutil.RemoteCommandWebsockets.IsEnabled() {
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
if err != nil {
return err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
if err != nil {
return err
}
}
return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
@ -147,6 +134,27 @@ func (*DefaultRemoteExecutor) Execute(url *url.URL, config *restclient.Config, s
})
}
// createExecutor returns the Executor or an error if one occurred.
func createExecutor(url *url.URL, config *restclient.Config) (remotecommand.Executor, error) {
exec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
if err != nil {
return nil, err
}
// Fallback executor is default, unless feature flag is explicitly disabled.
if !cmdutil.RemoteCommandWebsockets.IsDisabled() {
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
if err != nil {
return nil, err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
if err != nil {
return nil, err
}
}
return exec, nil
}
type StreamOptions struct {
Namespace string
PodName string

View File

@ -33,8 +33,8 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/rest/fake"
"k8s.io/client-go/tools/remotecommand"
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"
"k8s.io/kubectl/pkg/util/term"
)
@ -402,3 +402,37 @@ func TestSetupTTY(t *testing.T) {
t.Errorf("attach stdin, TTY, is a terminal: tty.Out should equal o.Out")
}
}
func TestCreateExecutor(t *testing.T) {
url, err := url.Parse("http://localhost:8080/index.html")
if err != nil {
t.Fatalf("unable to parse test url: %v", err)
}
config := cmdtesting.DefaultClientConfig()
// First, ensure that no environment variable creates the fallback executor.
executor, err := createExecutor(url, config)
if err != nil {
t.Fatalf("unable to create executor: %v", err)
}
if _, isFallback := executor.(*remotecommand.FallbackExecutor); !isFallback {
t.Errorf("expected fallback executor, got %#v", executor)
}
// Next, check turning on feature flag explicitly also creates fallback executor.
t.Setenv(string(cmdutil.RemoteCommandWebsockets), "true")
executor, err = createExecutor(url, config)
if err != nil {
t.Fatalf("unable to create executor: %v", err)
}
if _, isFallback := executor.(*remotecommand.FallbackExecutor); !isFallback {
t.Errorf("expected fallback executor, got %#v", executor)
}
// Finally, check explicit disabling does NOT create the fallback executor.
t.Setenv(string(cmdutil.RemoteCommandWebsockets), "false")
executor, err = createExecutor(url, config)
if err != nil {
t.Fatalf("unable to create executor: %v", err)
}
if _, isFallback := executor.(*remotecommand.FallbackExecutor); isFallback {
t.Errorf("expected fallback executor, got %#v", executor)
}
}

View File

@ -42,8 +42,6 @@ import (
"sigs.k8s.io/yaml"
utilkubectl "k8s.io/kubectl/pkg/cmd/util"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@ -818,7 +816,6 @@ metadata:
// We wait for a non-empty line so we know kubectl has attached
e2ekubectl.NewKubectlCommand(ns, "run", "run-test", "--image="+busyboxImage, "--restart=OnFailure", podRunningTimeoutArg, "--attach=true", "--stdin", "--", "sh", "-c", "echo -n read: && cat && echo 'stdin closed'").
WithStdinData("value\nabcd1234").
AppendEnv([]string{string(utilkubectl.RemoteCommandWebsockets), "true"}).
ExecOrDie(ns)
runOutput := waitForStdinContent("run-test", "stdin closed")
@ -836,7 +833,6 @@ metadata:
// to the container, this does not solve the race though.
e2ekubectl.NewKubectlCommand(ns, "run", "run-test-2", "--image="+busyboxImage, "--restart=OnFailure", podRunningTimeoutArg, "--attach=true", "--leave-stdin-open=true", "--", "sh", "-c", "cat && echo 'stdin closed'").
WithStdinData("abcd1234").
AppendEnv([]string{string(utilkubectl.RemoteCommandWebsockets), "true"}).
ExecOrDie(ns)
runOutput = waitForStdinContent("run-test-2", "stdin closed")
@ -848,7 +844,6 @@ metadata:
ginkgo.By("executing a command with run and attach with stdin with open stdin should remain running")
e2ekubectl.NewKubectlCommand(ns, "run", "run-test-3", "--image="+busyboxImage, "--restart=OnFailure", podRunningTimeoutArg, "--attach=true", "--leave-stdin-open=true", "--stdin", "--", "sh", "-c", "cat && echo 'stdin closed'").
WithStdinData("abcd1234\n").
AppendEnv([]string{string(utilkubectl.RemoteCommandWebsockets), "true"}).
ExecOrDie(ns)
runOutput = waitForStdinContent("run-test-3", "abcd1234")