From a147693deb2e7f040cf367aae4a7ae5d1cb3e7aa Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Tue, 13 Feb 2024 14:10:40 -0800 Subject: [PATCH 1/2] remote command turn on feature gates --- pkg/features/kube_features.go | 2 +- .../client-go/tools/remotecommand/fallback.go | 10 +++--- .../tools/remotecommand/fallback_test.go | 4 +-- .../k8s.io/kubectl/pkg/cmd/attach/attach.go | 36 +++++++++++-------- .../kubectl/pkg/cmd/attach/attach_test.go | 35 ++++++++++++++++++ .../src/k8s.io/kubectl/pkg/cmd/exec/exec.go | 36 +++++++++++-------- .../k8s.io/kubectl/pkg/cmd/exec/exec_test.go | 36 ++++++++++++++++++- test/e2e/kubectl/kubectl.go | 5 --- 8 files changed, 122 insertions(+), 42 deletions(-) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 4de8cf5fcd7..0551bc2a34a 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -971,7 +971,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS SkipReadOnlyValidationGCE: {Default: true, PreRelease: featuregate.Deprecated}, // remove in 1.31 - TranslateStreamCloseWebsocketRequests: {Default: false, PreRelease: featuregate.Alpha}, + TranslateStreamCloseWebsocketRequests: {Default: true, PreRelease: featuregate.Beta}, CloudControllerManagerWebhook: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/staging/src/k8s.io/client-go/tools/remotecommand/fallback.go b/staging/src/k8s.io/client-go/tools/remotecommand/fallback.go index 4846cdb5509..3efde3c5883 100644 --- a/staging/src/k8s.io/client-go/tools/remotecommand/fallback.go +++ b/staging/src/k8s.io/client-go/tools/remotecommand/fallback.go @@ -20,9 +20,9 @@ import ( "context" ) -var _ Executor = &fallbackExecutor{} +var _ Executor = &FallbackExecutor{} -type fallbackExecutor struct { +type FallbackExecutor struct { primary Executor secondary Executor shouldFallback func(error) bool @@ -33,7 +33,7 @@ type fallbackExecutor struct { // websocket "StreamWithContext" call fails. // func NewFallbackExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) { func NewFallbackExecutor(primary, secondary Executor, shouldFallback func(error) bool) (Executor, error) { - return &fallbackExecutor{ + return &FallbackExecutor{ primary: primary, secondary: secondary, shouldFallback: shouldFallback, @@ -41,14 +41,14 @@ func NewFallbackExecutor(primary, secondary Executor, shouldFallback func(error) } // Stream is deprecated. Please use "StreamWithContext". -func (f *fallbackExecutor) Stream(options StreamOptions) error { +func (f *FallbackExecutor) Stream(options StreamOptions) error { return f.StreamWithContext(context.Background(), options) } // StreamWithContext initially attempts to call "StreamWithContext" using the // primary executor, falling back to calling the secondary executor if the // initial primary call to upgrade to a websocket connection fails. -func (f *fallbackExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error { +func (f *FallbackExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error { err := f.primary.StreamWithContext(ctx, options) if f.shouldFallback(err) { return f.secondary.StreamWithContext(ctx, options) diff --git a/staging/src/k8s.io/client-go/tools/remotecommand/fallback_test.go b/staging/src/k8s.io/client-go/tools/remotecommand/fallback_test.go index 70049857050..52e1f7b160d 100644 --- a/staging/src/k8s.io/client-go/tools/remotecommand/fallback_test.go +++ b/staging/src/k8s.io/client-go/tools/remotecommand/fallback_test.go @@ -193,8 +193,8 @@ func TestFallbackClient_PrimaryAndSecondaryFail(t *testing.T) { exec, err := NewFallbackExecutor(websocketExecutor, spdyExecutor, func(error) bool { return true }) require.NoError(t, err) // Update the websocket executor to request remote command v4, which is unsupported. - fallbackExec, ok := exec.(*fallbackExecutor) - assert.True(t, ok, "error casting executor as fallbackExecutor") + fallbackExec, ok := exec.(*FallbackExecutor) + assert.True(t, ok, "error casting executor as FallbackExecutor") websocketExec, ok := fallbackExec.primary.(*wsStreamExecutor) assert.True(t, ok, "error casting executor as websocket executor") // Set the attempted subprotocol version to V4; websocket server only accepts V5. diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/attach/attach.go b/staging/src/k8s.io/kubectl/pkg/cmd/attach/attach.go index af25e072941..c1e8d83dc54 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/attach/attach.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/attach/attach.go @@ -158,23 +158,10 @@ type DefaultRemoteAttach struct{} // Attach executes attach to a running container func (*DefaultRemoteAttach) Attach(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { - // Legacy SPDY executor is default. If feature gate enabled, fallback - // executor attempts websockets first--then SPDY. - exec, err := remotecommand.NewSPDYExecutor(config, "POST", url) + exec, err := createExecutor(url, config) if err != nil { return err } - if cmdutil.RemoteCommandWebsockets.IsEnabled() { - // WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17). - websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String()) - if err != nil { - return err - } - exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure) - if err != nil { - return err - } - } return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ Stdin: stdin, Stdout: stdout, @@ -184,6 +171,27 @@ func (*DefaultRemoteAttach) Attach(url *url.URL, config *restclient.Config, stdi }) } +// createExecutor returns the Executor or an error if one occurred. +func createExecutor(url *url.URL, config *restclient.Config) (remotecommand.Executor, error) { + exec, err := remotecommand.NewSPDYExecutor(config, "POST", url) + if err != nil { + return nil, err + } + // Fallback executor is default, unless feature flag is explicitly disabled. + if !cmdutil.RemoteCommandWebsockets.IsDisabled() { + // WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17). + websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String()) + if err != nil { + return nil, err + } + exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure) + if err != nil { + return nil, err + } + } + return exec, nil +} + // Complete verifies command line arguments and loads data from the command environment func (o *AttachOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error { var err error diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/attach/attach_test.go b/staging/src/k8s.io/kubectl/pkg/cmd/attach/attach_test.go index 6d491323ebb..ac217e08cb9 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/attach/attach_test.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/attach/attach_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/tools/remotecommand" "k8s.io/kubectl/pkg/cmd/exec" cmdtesting "k8s.io/kubectl/pkg/cmd/testing" + cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/cmd/util/podcmd" "k8s.io/kubectl/pkg/polymorphichelpers" "k8s.io/kubectl/pkg/scheme" @@ -553,3 +554,37 @@ func TestReattachMessage(t *testing.T) { }) } } + +func TestCreateExecutor(t *testing.T) { + url, err := url.Parse("http://localhost:8080/index.html") + if err != nil { + t.Fatalf("unable to parse test url: %v", err) + } + config := cmdtesting.DefaultClientConfig() + // First, ensure that no environment variable creates the fallback executor. + executor, err := createExecutor(url, config) + if err != nil { + t.Fatalf("unable to create executor: %v", err) + } + if _, isFallback := executor.(*remotecommand.FallbackExecutor); !isFallback { + t.Errorf("expected fallback executor, got %#v", executor) + } + // Next, check turning on feature flag explicitly also creates fallback executor. + t.Setenv(string(cmdutil.RemoteCommandWebsockets), "true") + executor, err = createExecutor(url, config) + if err != nil { + t.Fatalf("unable to create executor: %v", err) + } + if _, isFallback := executor.(*remotecommand.FallbackExecutor); !isFallback { + t.Errorf("expected fallback executor, got %#v", executor) + } + // Finally, check explicit disabling does NOT create the fallback executor. + t.Setenv(string(cmdutil.RemoteCommandWebsockets), "false") + executor, err = createExecutor(url, config) + if err != nil { + t.Fatalf("unable to create executor: %v", err) + } + if _, isFallback := executor.(*remotecommand.FallbackExecutor); isFallback { + t.Errorf("expected fallback executor, got %#v", executor) + } +} diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/exec/exec.go b/staging/src/k8s.io/kubectl/pkg/cmd/exec/exec.go index 36d43beceb9..40580766418 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/exec/exec.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/exec/exec.go @@ -121,23 +121,10 @@ type RemoteExecutor interface { type DefaultRemoteExecutor struct{} func (*DefaultRemoteExecutor) Execute(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { - // Legacy SPDY executor is default. If feature gate enabled, fallback - // executor attempts websockets first--then SPDY. - exec, err := remotecommand.NewSPDYExecutor(config, "POST", url) + exec, err := createExecutor(url, config) if err != nil { return err } - if cmdutil.RemoteCommandWebsockets.IsEnabled() { - // WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17). - websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String()) - if err != nil { - return err - } - exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure) - if err != nil { - return err - } - } return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ Stdin: stdin, Stdout: stdout, @@ -147,6 +134,27 @@ func (*DefaultRemoteExecutor) Execute(url *url.URL, config *restclient.Config, s }) } +// createExecutor returns the Executor or an error if one occurred. +func createExecutor(url *url.URL, config *restclient.Config) (remotecommand.Executor, error) { + exec, err := remotecommand.NewSPDYExecutor(config, "POST", url) + if err != nil { + return nil, err + } + // Fallback executor is default, unless feature flag is explicitly disabled. + if !cmdutil.RemoteCommandWebsockets.IsDisabled() { + // WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17). + websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String()) + if err != nil { + return nil, err + } + exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure) + if err != nil { + return nil, err + } + } + return exec, nil +} + type StreamOptions struct { Namespace string PodName string diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/exec/exec_test.go b/staging/src/k8s.io/kubectl/pkg/cmd/exec/exec_test.go index 7305231f129..af8b9099df8 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/exec/exec_test.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/exec/exec_test.go @@ -33,8 +33,8 @@ import ( restclient "k8s.io/client-go/rest" "k8s.io/client-go/rest/fake" "k8s.io/client-go/tools/remotecommand" - cmdtesting "k8s.io/kubectl/pkg/cmd/testing" + cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/scheme" "k8s.io/kubectl/pkg/util/term" ) @@ -402,3 +402,37 @@ func TestSetupTTY(t *testing.T) { t.Errorf("attach stdin, TTY, is a terminal: tty.Out should equal o.Out") } } + +func TestCreateExecutor(t *testing.T) { + url, err := url.Parse("http://localhost:8080/index.html") + if err != nil { + t.Fatalf("unable to parse test url: %v", err) + } + config := cmdtesting.DefaultClientConfig() + // First, ensure that no environment variable creates the fallback executor. + executor, err := createExecutor(url, config) + if err != nil { + t.Fatalf("unable to create executor: %v", err) + } + if _, isFallback := executor.(*remotecommand.FallbackExecutor); !isFallback { + t.Errorf("expected fallback executor, got %#v", executor) + } + // Next, check turning on feature flag explicitly also creates fallback executor. + t.Setenv(string(cmdutil.RemoteCommandWebsockets), "true") + executor, err = createExecutor(url, config) + if err != nil { + t.Fatalf("unable to create executor: %v", err) + } + if _, isFallback := executor.(*remotecommand.FallbackExecutor); !isFallback { + t.Errorf("expected fallback executor, got %#v", executor) + } + // Finally, check explicit disabling does NOT create the fallback executor. + t.Setenv(string(cmdutil.RemoteCommandWebsockets), "false") + executor, err = createExecutor(url, config) + if err != nil { + t.Fatalf("unable to create executor: %v", err) + } + if _, isFallback := executor.(*remotecommand.FallbackExecutor); isFallback { + t.Errorf("expected fallback executor, got %#v", executor) + } +} diff --git a/test/e2e/kubectl/kubectl.go b/test/e2e/kubectl/kubectl.go index 9258b4d9d3f..f1affb9be09 100644 --- a/test/e2e/kubectl/kubectl.go +++ b/test/e2e/kubectl/kubectl.go @@ -42,8 +42,6 @@ import ( "sigs.k8s.io/yaml" - utilkubectl "k8s.io/kubectl/pkg/cmd/util" - v1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" @@ -818,7 +816,6 @@ metadata: // We wait for a non-empty line so we know kubectl has attached e2ekubectl.NewKubectlCommand(ns, "run", "run-test", "--image="+busyboxImage, "--restart=OnFailure", podRunningTimeoutArg, "--attach=true", "--stdin", "--", "sh", "-c", "echo -n read: && cat && echo 'stdin closed'"). WithStdinData("value\nabcd1234"). - AppendEnv([]string{string(utilkubectl.RemoteCommandWebsockets), "true"}). ExecOrDie(ns) runOutput := waitForStdinContent("run-test", "stdin closed") @@ -836,7 +833,6 @@ metadata: // to the container, this does not solve the race though. e2ekubectl.NewKubectlCommand(ns, "run", "run-test-2", "--image="+busyboxImage, "--restart=OnFailure", podRunningTimeoutArg, "--attach=true", "--leave-stdin-open=true", "--", "sh", "-c", "cat && echo 'stdin closed'"). WithStdinData("abcd1234"). - AppendEnv([]string{string(utilkubectl.RemoteCommandWebsockets), "true"}). ExecOrDie(ns) runOutput = waitForStdinContent("run-test-2", "stdin closed") @@ -848,7 +844,6 @@ metadata: ginkgo.By("executing a command with run and attach with stdin with open stdin should remain running") e2ekubectl.NewKubectlCommand(ns, "run", "run-test-3", "--image="+busyboxImage, "--restart=OnFailure", podRunningTimeoutArg, "--attach=true", "--leave-stdin-open=true", "--stdin", "--", "sh", "-c", "cat && echo 'stdin closed'"). WithStdinData("abcd1234\n"). - AppendEnv([]string{string(utilkubectl.RemoteCommandWebsockets), "true"}). ExecOrDie(ns) runOutput = waitForStdinContent("run-test-3", "abcd1234") From 03812ddb169725b0652744c2ecaa151f5c03887b Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Sat, 24 Feb 2024 03:55:17 +0000 Subject: [PATCH 2/2] streamtranslator counter metric by status code --- pkg/features/kube_features.go | 20 ++-- .../pkg/util/proxy/metrics/metrics.go | 61 ++++++++++++ .../pkg/util/proxy/streamtranslator.go | 13 +++ .../pkg/util/proxy/streamtranslator_test.go | 95 +++++++++++++++++++ vendor/modules.txt | 1 + 5 files changed, 180 insertions(+), 10 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/proxy/metrics/metrics.go diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 0551bc2a34a..06aa971a260 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -165,14 +165,6 @@ const ( // Enables kubelet to detect CSI volume condition and send the event of the abnormal volume to the corresponding pod that is using it. CSIVolumeHealth featuregate.Feature = "CSIVolumeHealth" - // owner: @seans3 - // kep: http://kep.k8s.io/4006 - // alpha: v1.29 - // - // Enables StreamTranslator proxy to handle WebSockets upgrade requests for the - // version of the RemoteCommand subprotocol that supports the "close" signal. - TranslateStreamCloseWebsocketRequests featuregate.Feature = "TranslateStreamCloseWebsocketRequests" - // owner: @nckturner // kep: http://kep.k8s.io/2699 // alpha: v1.27 @@ -808,6 +800,14 @@ const ( // Allow the usage of options to fine-tune the topology manager policies. TopologyManagerPolicyOptions featuregate.Feature = "TopologyManagerPolicyOptions" + // owner: @seans3 + // kep: http://kep.k8s.io/4006 + // beta: v1.30 + // + // Enables StreamTranslator proxy to handle WebSockets upgrade requests for the + // version of the RemoteCommand subprotocol that supports the "close" signal. + TranslateStreamCloseWebsocketRequests featuregate.Feature = "TranslateStreamCloseWebsocketRequests" + // owner: @richabanker // alpha: v1.28 // @@ -971,8 +971,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS SkipReadOnlyValidationGCE: {Default: true, PreRelease: featuregate.Deprecated}, // remove in 1.31 - TranslateStreamCloseWebsocketRequests: {Default: true, PreRelease: featuregate.Beta}, - CloudControllerManagerWebhook: {Default: false, PreRelease: featuregate.Alpha}, ContainerCheckpoint: {Default: false, PreRelease: featuregate.Alpha}, @@ -1139,6 +1137,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS TopologyManagerPolicyOptions: {Default: true, PreRelease: featuregate.Beta}, + TranslateStreamCloseWebsocketRequests: {Default: true, PreRelease: featuregate.Beta}, + UnknownVersionInteroperabilityProxy: {Default: false, PreRelease: featuregate.Alpha}, VolumeAttributesClass: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/metrics/metrics.go new file mode 100644 index 00000000000..0fb9e7f9cd1 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/metrics/metrics.go @@ -0,0 +1,61 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const ( + subsystem = "apiserver" + statuscode = "code" +) + +var registerMetricsOnce sync.Once + +var ( + // streamTranslatorRequestsTotal counts the number of requests that were handled by + // the StreamTranslatorProxy. + streamTranslatorRequestsTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: subsystem, + Name: "stream_translator_requests_total", + Help: "Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5", + StabilityLevel: metrics.ALPHA, + }, + []string{statuscode}, + ) +) + +func Register() { + registerMetricsOnce.Do(func() { + legacyregistry.MustRegister(streamTranslatorRequestsTotal) + }) +} + +func ResetForTest() { + streamTranslatorRequestsTotal.Reset() +} + +// IncStreamTranslatorRequest increments the # of requests handled by the StreamTranslatorProxy. +func IncStreamTranslatorRequest(ctx context.Context, status string) { + streamTranslatorRequestsTotal.WithContext(ctx).WithLabelValues(status).Add(1) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go index 94ea13dff5b..6dabc1c7b4a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go @@ -20,12 +20,14 @@ import ( "fmt" "net/http" "net/url" + "strconv" "github.com/mxk/go-flowrate/flowrate" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/httpstream/spdy" constants "k8s.io/apimachinery/pkg/util/remotecommand" + "k8s.io/apiserver/pkg/util/proxy/metrics" "k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/util/exec" ) @@ -61,6 +63,8 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req // to the client. websocketStreams, err := webSocketServerStreams(req, w, h.Options) if err != nil { + // Client error increments bad request status code. + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusBadRequest)) return } defer websocketStreams.conn.Close() @@ -69,11 +73,13 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req spdyRoundTripper, err := spdy.NewRoundTripperWithConfig(spdy.RoundTripperConfig{UpgradeTransport: h.Transport}) if err != nil { websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError)) return } spdyExecutor, err := remotecommand.NewSPDYExecutorRejectRedirects(spdyRoundTripper, spdyRoundTripper, "POST", h.Location) if err != nil { websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError)) return } @@ -115,10 +121,16 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req //nolint:errcheck // Ignore writeStatus returned error if statusErr, ok := err.(*apierrors.StatusError); ok { websocketStreams.writeStatus(statusErr) + // Increment status code returned within status error. + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(int(statusErr.Status().Code))) } else if exitErr, ok := err.(exec.CodeExitError); ok && exitErr.Exited() { websocketStreams.writeStatus(codeExitToStatusError(exitErr)) + // Returned an exit code from the container, so not an error in + // stream translator--add StatusOK to metrics. + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK)) } else { websocketStreams.writeStatus(apierrors.NewInternalError(err)) + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError)) } return } @@ -128,6 +140,7 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req websocketStreams.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ Status: metav1.StatusSuccess, }}) + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK)) } // translatorSizeQueue feeds the size events from the WebSocket diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go index 6246c35d49c..992eef77a18 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go @@ -41,9 +41,12 @@ import ( "k8s.io/apimachinery/pkg/util/httpstream/spdy" rcconstants "k8s.io/apimachinery/pkg/util/remotecommand" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/util/proxy/metrics" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/transport" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/component-base/metrics/testutil" ) // TestStreamTranslator_LoopbackStdinToStdout returns random data sent on the client's @@ -53,6 +56,9 @@ import ( // websocket data into spdy). The returned data read on the websocket client STDOUT is then // compared the random data sent on STDIN to ensure they are the same. func TestStreamTranslator_LoopbackStdinToStdout(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) // Create upstream fake SPDY server which copies STDIN back onto STDOUT stream. spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx, err := createSPDYServerStreams(w, req, Options{ @@ -132,6 +138,16 @@ func TestStreamTranslator_LoopbackStdinToStdout(t *testing.T) { if !bytes.Equal(randomData, data) { t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData)) } + // Validate the streamtranslator metrics; should be one 200 success. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="200"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } } // TestStreamTranslator_LoopbackStdinToStderr returns random data sent on the client's @@ -141,6 +157,9 @@ func TestStreamTranslator_LoopbackStdinToStdout(t *testing.T) { // websocket data into spdy). The returned data read on the websocket client STDERR is then // compared the random data sent on STDIN to ensure they are the same. func TestStreamTranslator_LoopbackStdinToStderr(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) // Create upstream fake SPDY server which copies STDIN back onto STDERR stream. spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx, err := createSPDYServerStreams(w, req, Options{ @@ -219,6 +238,16 @@ func TestStreamTranslator_LoopbackStdinToStderr(t *testing.T) { if !bytes.Equal(randomData, data) { t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData)) } + // Validate the streamtranslator metrics; should be one 200 success. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="200"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } } // Returns a random exit code in the range(1-127). @@ -231,6 +260,9 @@ func randomExitCode() int { // TestStreamTranslator_ErrorStream tests the error stream by sending an error with a random // exit code, then validating the error arrives on the error stream. func TestStreamTranslator_ErrorStream(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) expectedExitCode := randomExitCode() // Create upstream fake SPDY server, returning a non-zero exit code // on error stream within the structured error. @@ -321,11 +353,24 @@ func TestStreamTranslator_ErrorStream(t *testing.T) { t.Errorf("expected error (%s), got (%s)", expectedError, err) } } + // Validate the streamtranslator metrics; an exit code error is considered 200 success. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="200"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } } // TestStreamTranslator_MultipleReadChannels tests two streams (STDOUT, STDERR) reading from // the connections at the same time. func TestStreamTranslator_MultipleReadChannels(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) // Create upstream fake SPDY server which copies STDIN back onto STDOUT and STDERR stream. spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx, err := createSPDYServerStreams(w, req, Options{ @@ -417,6 +462,16 @@ func TestStreamTranslator_MultipleReadChannels(t *testing.T) { if !bytes.Equal(stderrBytes, randomData) { t.Errorf("unexpected data received: %d sent: %d", len(stderrBytes), len(randomData)) } + // Validate the streamtranslator metrics; should have one 200 success. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="200"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } } // TestStreamTranslator_ThrottleReadChannels tests two streams (STDOUT, STDERR) using rate limited streams. @@ -556,6 +611,9 @@ func randomTerminalSize() remotecommand.TerminalSize { // TestStreamTranslator_MultipleWriteChannels func TestStreamTranslator_TTYResizeChannel(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) // Create the fake terminal size queue and the actualTerminalSizes which // will be received at the opposite websocket endpoint. numSizeQueue := 10000 @@ -633,11 +691,24 @@ func TestStreamTranslator_TTYResizeChannel(t *testing.T) { t.Errorf("expected terminal resize window %v, got %v", expected, actual) } } + // Validate the streamtranslator metrics; should have one 200 success. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="200"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } } // TestStreamTranslator_WebSocketServerErrors validates that when there is a problem creating // the websocket server as the first step of the StreamTranslator an error is properly returned. func TestStreamTranslator_WebSocketServerErrors(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) spdyLocation, err := url.Parse("http://127.0.0.1") if err != nil { t.Fatalf("Unable to parse spdy server URL") @@ -684,11 +755,24 @@ func TestStreamTranslator_WebSocketServerErrors(t *testing.T) { t.Errorf("expected websocket bad handshake error, got (%s)", err) } } + // Validate the streamtranslator metrics; should have one 500 failure. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="400"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } } // TestStreamTranslator_BlockRedirects verifies that the StreamTranslator will *not* follow // redirects; it will thrown an error instead. func TestStreamTranslator_BlockRedirects(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) for _, statusCode := range []int{ http.StatusMovedPermanently, // 301 http.StatusFound, // 302 @@ -744,6 +828,17 @@ func TestStreamTranslator_BlockRedirects(t *testing.T) { t.Errorf("expected redirect not allowed error, got (%s)", err) } } + // Validate the streamtranslator metrics; should have one 500 failure each loop. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="500"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } + metrics.ResetForTest() // Clear metrics each loop } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 8881d4818e5..eaee49eda99 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1525,6 +1525,7 @@ k8s.io/apiserver/pkg/util/openapi k8s.io/apiserver/pkg/util/peerproxy k8s.io/apiserver/pkg/util/peerproxy/metrics k8s.io/apiserver/pkg/util/proxy +k8s.io/apiserver/pkg/util/proxy/metrics k8s.io/apiserver/pkg/util/shufflesharding k8s.io/apiserver/pkg/util/webhook k8s.io/apiserver/pkg/util/x509metrics