diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 0551bc2a34a..06aa971a260 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -165,14 +165,6 @@ const ( // Enables kubelet to detect CSI volume condition and send the event of the abnormal volume to the corresponding pod that is using it. CSIVolumeHealth featuregate.Feature = "CSIVolumeHealth" - // owner: @seans3 - // kep: http://kep.k8s.io/4006 - // alpha: v1.29 - // - // Enables StreamTranslator proxy to handle WebSockets upgrade requests for the - // version of the RemoteCommand subprotocol that supports the "close" signal. - TranslateStreamCloseWebsocketRequests featuregate.Feature = "TranslateStreamCloseWebsocketRequests" - // owner: @nckturner // kep: http://kep.k8s.io/2699 // alpha: v1.27 @@ -808,6 +800,14 @@ const ( // Allow the usage of options to fine-tune the topology manager policies. TopologyManagerPolicyOptions featuregate.Feature = "TopologyManagerPolicyOptions" + // owner: @seans3 + // kep: http://kep.k8s.io/4006 + // beta: v1.30 + // + // Enables StreamTranslator proxy to handle WebSockets upgrade requests for the + // version of the RemoteCommand subprotocol that supports the "close" signal. + TranslateStreamCloseWebsocketRequests featuregate.Feature = "TranslateStreamCloseWebsocketRequests" + // owner: @richabanker // alpha: v1.28 // @@ -971,8 +971,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS SkipReadOnlyValidationGCE: {Default: true, PreRelease: featuregate.Deprecated}, // remove in 1.31 - TranslateStreamCloseWebsocketRequests: {Default: true, PreRelease: featuregate.Beta}, - CloudControllerManagerWebhook: {Default: false, PreRelease: featuregate.Alpha}, ContainerCheckpoint: {Default: false, PreRelease: featuregate.Alpha}, @@ -1139,6 +1137,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS TopologyManagerPolicyOptions: {Default: true, PreRelease: featuregate.Beta}, + TranslateStreamCloseWebsocketRequests: {Default: true, PreRelease: featuregate.Beta}, + UnknownVersionInteroperabilityProxy: {Default: false, PreRelease: featuregate.Alpha}, VolumeAttributesClass: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/metrics/metrics.go new file mode 100644 index 00000000000..0fb9e7f9cd1 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/metrics/metrics.go @@ -0,0 +1,61 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const ( + subsystem = "apiserver" + statuscode = "code" +) + +var registerMetricsOnce sync.Once + +var ( + // streamTranslatorRequestsTotal counts the number of requests that were handled by + // the StreamTranslatorProxy. + streamTranslatorRequestsTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: subsystem, + Name: "stream_translator_requests_total", + Help: "Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5", + StabilityLevel: metrics.ALPHA, + }, + []string{statuscode}, + ) +) + +func Register() { + registerMetricsOnce.Do(func() { + legacyregistry.MustRegister(streamTranslatorRequestsTotal) + }) +} + +func ResetForTest() { + streamTranslatorRequestsTotal.Reset() +} + +// IncStreamTranslatorRequest increments the # of requests handled by the StreamTranslatorProxy. +func IncStreamTranslatorRequest(ctx context.Context, status string) { + streamTranslatorRequestsTotal.WithContext(ctx).WithLabelValues(status).Add(1) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go index 94ea13dff5b..6dabc1c7b4a 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator.go @@ -20,12 +20,14 @@ import ( "fmt" "net/http" "net/url" + "strconv" "github.com/mxk/go-flowrate/flowrate" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/httpstream/spdy" constants "k8s.io/apimachinery/pkg/util/remotecommand" + "k8s.io/apiserver/pkg/util/proxy/metrics" "k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/util/exec" ) @@ -61,6 +63,8 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req // to the client. websocketStreams, err := webSocketServerStreams(req, w, h.Options) if err != nil { + // Client error increments bad request status code. + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusBadRequest)) return } defer websocketStreams.conn.Close() @@ -69,11 +73,13 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req spdyRoundTripper, err := spdy.NewRoundTripperWithConfig(spdy.RoundTripperConfig{UpgradeTransport: h.Transport}) if err != nil { websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError)) return } spdyExecutor, err := remotecommand.NewSPDYExecutorRejectRedirects(spdyRoundTripper, spdyRoundTripper, "POST", h.Location) if err != nil { websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError)) return } @@ -115,10 +121,16 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req //nolint:errcheck // Ignore writeStatus returned error if statusErr, ok := err.(*apierrors.StatusError); ok { websocketStreams.writeStatus(statusErr) + // Increment status code returned within status error. + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(int(statusErr.Status().Code))) } else if exitErr, ok := err.(exec.CodeExitError); ok && exitErr.Exited() { websocketStreams.writeStatus(codeExitToStatusError(exitErr)) + // Returned an exit code from the container, so not an error in + // stream translator--add StatusOK to metrics. + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK)) } else { websocketStreams.writeStatus(apierrors.NewInternalError(err)) + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError)) } return } @@ -128,6 +140,7 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req websocketStreams.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ Status: metav1.StatusSuccess, }}) + metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK)) } // translatorSizeQueue feeds the size events from the WebSocket diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go index 6246c35d49c..992eef77a18 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/streamtranslator_test.go @@ -41,9 +41,12 @@ import ( "k8s.io/apimachinery/pkg/util/httpstream/spdy" rcconstants "k8s.io/apimachinery/pkg/util/remotecommand" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/util/proxy/metrics" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/transport" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/component-base/metrics/testutil" ) // TestStreamTranslator_LoopbackStdinToStdout returns random data sent on the client's @@ -53,6 +56,9 @@ import ( // websocket data into spdy). The returned data read on the websocket client STDOUT is then // compared the random data sent on STDIN to ensure they are the same. func TestStreamTranslator_LoopbackStdinToStdout(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) // Create upstream fake SPDY server which copies STDIN back onto STDOUT stream. spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx, err := createSPDYServerStreams(w, req, Options{ @@ -132,6 +138,16 @@ func TestStreamTranslator_LoopbackStdinToStdout(t *testing.T) { if !bytes.Equal(randomData, data) { t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData)) } + // Validate the streamtranslator metrics; should be one 200 success. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="200"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } } // TestStreamTranslator_LoopbackStdinToStderr returns random data sent on the client's @@ -141,6 +157,9 @@ func TestStreamTranslator_LoopbackStdinToStdout(t *testing.T) { // websocket data into spdy). The returned data read on the websocket client STDERR is then // compared the random data sent on STDIN to ensure they are the same. func TestStreamTranslator_LoopbackStdinToStderr(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) // Create upstream fake SPDY server which copies STDIN back onto STDERR stream. spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx, err := createSPDYServerStreams(w, req, Options{ @@ -219,6 +238,16 @@ func TestStreamTranslator_LoopbackStdinToStderr(t *testing.T) { if !bytes.Equal(randomData, data) { t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData)) } + // Validate the streamtranslator metrics; should be one 200 success. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="200"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } } // Returns a random exit code in the range(1-127). @@ -231,6 +260,9 @@ func randomExitCode() int { // TestStreamTranslator_ErrorStream tests the error stream by sending an error with a random // exit code, then validating the error arrives on the error stream. func TestStreamTranslator_ErrorStream(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) expectedExitCode := randomExitCode() // Create upstream fake SPDY server, returning a non-zero exit code // on error stream within the structured error. @@ -321,11 +353,24 @@ func TestStreamTranslator_ErrorStream(t *testing.T) { t.Errorf("expected error (%s), got (%s)", expectedError, err) } } + // Validate the streamtranslator metrics; an exit code error is considered 200 success. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="200"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } } // TestStreamTranslator_MultipleReadChannels tests two streams (STDOUT, STDERR) reading from // the connections at the same time. func TestStreamTranslator_MultipleReadChannels(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) // Create upstream fake SPDY server which copies STDIN back onto STDOUT and STDERR stream. spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx, err := createSPDYServerStreams(w, req, Options{ @@ -417,6 +462,16 @@ func TestStreamTranslator_MultipleReadChannels(t *testing.T) { if !bytes.Equal(stderrBytes, randomData) { t.Errorf("unexpected data received: %d sent: %d", len(stderrBytes), len(randomData)) } + // Validate the streamtranslator metrics; should have one 200 success. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="200"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } } // TestStreamTranslator_ThrottleReadChannels tests two streams (STDOUT, STDERR) using rate limited streams. @@ -556,6 +611,9 @@ func randomTerminalSize() remotecommand.TerminalSize { // TestStreamTranslator_MultipleWriteChannels func TestStreamTranslator_TTYResizeChannel(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) // Create the fake terminal size queue and the actualTerminalSizes which // will be received at the opposite websocket endpoint. numSizeQueue := 10000 @@ -633,11 +691,24 @@ func TestStreamTranslator_TTYResizeChannel(t *testing.T) { t.Errorf("expected terminal resize window %v, got %v", expected, actual) } } + // Validate the streamtranslator metrics; should have one 200 success. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="200"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } } // TestStreamTranslator_WebSocketServerErrors validates that when there is a problem creating // the websocket server as the first step of the StreamTranslator an error is properly returned. func TestStreamTranslator_WebSocketServerErrors(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) spdyLocation, err := url.Parse("http://127.0.0.1") if err != nil { t.Fatalf("Unable to parse spdy server URL") @@ -684,11 +755,24 @@ func TestStreamTranslator_WebSocketServerErrors(t *testing.T) { t.Errorf("expected websocket bad handshake error, got (%s)", err) } } + // Validate the streamtranslator metrics; should have one 500 failure. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="400"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } } // TestStreamTranslator_BlockRedirects verifies that the StreamTranslator will *not* follow // redirects; it will thrown an error instead. func TestStreamTranslator_BlockRedirects(t *testing.T) { + metrics.Register() + metrics.ResetForTest() + t.Cleanup(metrics.ResetForTest) for _, statusCode := range []int{ http.StatusMovedPermanently, // 301 http.StatusFound, // 302 @@ -744,6 +828,17 @@ func TestStreamTranslator_BlockRedirects(t *testing.T) { t.Errorf("expected redirect not allowed error, got (%s)", err) } } + // Validate the streamtranslator metrics; should have one 500 failure each loop. + metricNames := []string{"apiserver_stream_translator_requests_total"} + expected := ` +# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5 +# TYPE apiserver_stream_translator_requests_total counter +apiserver_stream_translator_requests_total{code="500"} 1 +` + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil { + t.Fatal(err) + } + metrics.ResetForTest() // Clear metrics each loop } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 8881d4818e5..eaee49eda99 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1525,6 +1525,7 @@ k8s.io/apiserver/pkg/util/openapi k8s.io/apiserver/pkg/util/peerproxy k8s.io/apiserver/pkg/util/peerproxy/metrics k8s.io/apiserver/pkg/util/proxy +k8s.io/apiserver/pkg/util/proxy/metrics k8s.io/apiserver/pkg/util/shufflesharding k8s.io/apiserver/pkg/util/webhook k8s.io/apiserver/pkg/util/x509metrics