streamtranslator counter metric by status code

This commit is contained in:
Sean Sullivan 2024-02-24 03:55:17 +00:00
parent a147693deb
commit 03812ddb16
5 changed files with 180 additions and 10 deletions

View File

@ -165,14 +165,6 @@ const (
// Enables kubelet to detect CSI volume condition and send the event of the abnormal volume to the corresponding pod that is using it.
CSIVolumeHealth featuregate.Feature = "CSIVolumeHealth"
// owner: @seans3
// kep: http://kep.k8s.io/4006
// alpha: v1.29
//
// Enables StreamTranslator proxy to handle WebSockets upgrade requests for the
// version of the RemoteCommand subprotocol that supports the "close" signal.
TranslateStreamCloseWebsocketRequests featuregate.Feature = "TranslateStreamCloseWebsocketRequests"
// owner: @nckturner
// kep: http://kep.k8s.io/2699
// alpha: v1.27
@ -808,6 +800,14 @@ const (
// Allow the usage of options to fine-tune the topology manager policies.
TopologyManagerPolicyOptions featuregate.Feature = "TopologyManagerPolicyOptions"
// owner: @seans3
// kep: http://kep.k8s.io/4006
// beta: v1.30
//
// Enables StreamTranslator proxy to handle WebSockets upgrade requests for the
// version of the RemoteCommand subprotocol that supports the "close" signal.
TranslateStreamCloseWebsocketRequests featuregate.Feature = "TranslateStreamCloseWebsocketRequests"
// owner: @richabanker
// alpha: v1.28
//
@ -971,8 +971,6 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
SkipReadOnlyValidationGCE: {Default: true, PreRelease: featuregate.Deprecated}, // remove in 1.31
TranslateStreamCloseWebsocketRequests: {Default: true, PreRelease: featuregate.Beta},
CloudControllerManagerWebhook: {Default: false, PreRelease: featuregate.Alpha},
ContainerCheckpoint: {Default: false, PreRelease: featuregate.Alpha},
@ -1139,6 +1137,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
TopologyManagerPolicyOptions: {Default: true, PreRelease: featuregate.Beta},
TranslateStreamCloseWebsocketRequests: {Default: true, PreRelease: featuregate.Beta},
UnknownVersionInteroperabilityProxy: {Default: false, PreRelease: featuregate.Alpha},
VolumeAttributesClass: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -0,0 +1,61 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"context"
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
subsystem = "apiserver"
statuscode = "code"
)
var registerMetricsOnce sync.Once
var (
// streamTranslatorRequestsTotal counts the number of requests that were handled by
// the StreamTranslatorProxy.
streamTranslatorRequestsTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: subsystem,
Name: "stream_translator_requests_total",
Help: "Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5",
StabilityLevel: metrics.ALPHA,
},
[]string{statuscode},
)
)
func Register() {
registerMetricsOnce.Do(func() {
legacyregistry.MustRegister(streamTranslatorRequestsTotal)
})
}
func ResetForTest() {
streamTranslatorRequestsTotal.Reset()
}
// IncStreamTranslatorRequest increments the # of requests handled by the StreamTranslatorProxy.
func IncStreamTranslatorRequest(ctx context.Context, status string) {
streamTranslatorRequestsTotal.WithContext(ctx).WithLabelValues(status).Add(1)
}

View File

@ -20,12 +20,14 @@ import (
"fmt"
"net/http"
"net/url"
"strconv"
"github.com/mxk/go-flowrate/flowrate"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
constants "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apiserver/pkg/util/proxy/metrics"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/util/exec"
)
@ -61,6 +63,8 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
// to the client.
websocketStreams, err := webSocketServerStreams(req, w, h.Options)
if err != nil {
// Client error increments bad request status code.
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusBadRequest))
return
}
defer websocketStreams.conn.Close()
@ -69,11 +73,13 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
spdyRoundTripper, err := spdy.NewRoundTripperWithConfig(spdy.RoundTripperConfig{UpgradeTransport: h.Transport})
if err != nil {
websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError))
return
}
spdyExecutor, err := remotecommand.NewSPDYExecutorRejectRedirects(spdyRoundTripper, spdyRoundTripper, "POST", h.Location)
if err != nil {
websocketStreams.writeStatus(apierrors.NewInternalError(err)) //nolint:errcheck
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError))
return
}
@ -115,10 +121,16 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
//nolint:errcheck // Ignore writeStatus returned error
if statusErr, ok := err.(*apierrors.StatusError); ok {
websocketStreams.writeStatus(statusErr)
// Increment status code returned within status error.
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(int(statusErr.Status().Code)))
} else if exitErr, ok := err.(exec.CodeExitError); ok && exitErr.Exited() {
websocketStreams.writeStatus(codeExitToStatusError(exitErr))
// Returned an exit code from the container, so not an error in
// stream translator--add StatusOK to metrics.
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK))
} else {
websocketStreams.writeStatus(apierrors.NewInternalError(err))
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusInternalServerError))
}
return
}
@ -128,6 +140,7 @@ func (h *StreamTranslatorHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
websocketStreams.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusSuccess,
}})
metrics.IncStreamTranslatorRequest(req.Context(), strconv.Itoa(http.StatusOK))
}
// translatorSizeQueue feeds the size events from the WebSocket

View File

@ -41,9 +41,12 @@ import (
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
rcconstants "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/proxy/metrics"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
)
// TestStreamTranslator_LoopbackStdinToStdout returns random data sent on the client's
@ -53,6 +56,9 @@ import (
// websocket data into spdy). The returned data read on the websocket client STDOUT is then
// compared the random data sent on STDIN to ensure they are the same.
func TestStreamTranslator_LoopbackStdinToStdout(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
// Create upstream fake SPDY server which copies STDIN back onto STDOUT stream.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, err := createSPDYServerStreams(w, req, Options{
@ -132,6 +138,16 @@ func TestStreamTranslator_LoopbackStdinToStdout(t *testing.T) {
if !bytes.Equal(randomData, data) {
t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData))
}
// Validate the streamtranslator metrics; should be one 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_LoopbackStdinToStderr returns random data sent on the client's
@ -141,6 +157,9 @@ func TestStreamTranslator_LoopbackStdinToStdout(t *testing.T) {
// websocket data into spdy). The returned data read on the websocket client STDERR is then
// compared the random data sent on STDIN to ensure they are the same.
func TestStreamTranslator_LoopbackStdinToStderr(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
// Create upstream fake SPDY server which copies STDIN back onto STDERR stream.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, err := createSPDYServerStreams(w, req, Options{
@ -219,6 +238,16 @@ func TestStreamTranslator_LoopbackStdinToStderr(t *testing.T) {
if !bytes.Equal(randomData, data) {
t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData))
}
// Validate the streamtranslator metrics; should be one 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// Returns a random exit code in the range(1-127).
@ -231,6 +260,9 @@ func randomExitCode() int {
// TestStreamTranslator_ErrorStream tests the error stream by sending an error with a random
// exit code, then validating the error arrives on the error stream.
func TestStreamTranslator_ErrorStream(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
expectedExitCode := randomExitCode()
// Create upstream fake SPDY server, returning a non-zero exit code
// on error stream within the structured error.
@ -321,11 +353,24 @@ func TestStreamTranslator_ErrorStream(t *testing.T) {
t.Errorf("expected error (%s), got (%s)", expectedError, err)
}
}
// Validate the streamtranslator metrics; an exit code error is considered 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_MultipleReadChannels tests two streams (STDOUT, STDERR) reading from
// the connections at the same time.
func TestStreamTranslator_MultipleReadChannels(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
// Create upstream fake SPDY server which copies STDIN back onto STDOUT and STDERR stream.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, err := createSPDYServerStreams(w, req, Options{
@ -417,6 +462,16 @@ func TestStreamTranslator_MultipleReadChannels(t *testing.T) {
if !bytes.Equal(stderrBytes, randomData) {
t.Errorf("unexpected data received: %d sent: %d", len(stderrBytes), len(randomData))
}
// Validate the streamtranslator metrics; should have one 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_ThrottleReadChannels tests two streams (STDOUT, STDERR) using rate limited streams.
@ -556,6 +611,9 @@ func randomTerminalSize() remotecommand.TerminalSize {
// TestStreamTranslator_MultipleWriteChannels
func TestStreamTranslator_TTYResizeChannel(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
// Create the fake terminal size queue and the actualTerminalSizes which
// will be received at the opposite websocket endpoint.
numSizeQueue := 10000
@ -633,11 +691,24 @@ func TestStreamTranslator_TTYResizeChannel(t *testing.T) {
t.Errorf("expected terminal resize window %v, got %v", expected, actual)
}
}
// Validate the streamtranslator metrics; should have one 200 success.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="200"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_WebSocketServerErrors validates that when there is a problem creating
// the websocket server as the first step of the StreamTranslator an error is properly returned.
func TestStreamTranslator_WebSocketServerErrors(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
spdyLocation, err := url.Parse("http://127.0.0.1")
if err != nil {
t.Fatalf("Unable to parse spdy server URL")
@ -684,11 +755,24 @@ func TestStreamTranslator_WebSocketServerErrors(t *testing.T) {
t.Errorf("expected websocket bad handshake error, got (%s)", err)
}
}
// Validate the streamtranslator metrics; should have one 500 failure.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="400"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
}
// TestStreamTranslator_BlockRedirects verifies that the StreamTranslator will *not* follow
// redirects; it will thrown an error instead.
func TestStreamTranslator_BlockRedirects(t *testing.T) {
metrics.Register()
metrics.ResetForTest()
t.Cleanup(metrics.ResetForTest)
for _, statusCode := range []int{
http.StatusMovedPermanently, // 301
http.StatusFound, // 302
@ -744,6 +828,17 @@ func TestStreamTranslator_BlockRedirects(t *testing.T) {
t.Errorf("expected redirect not allowed error, got (%s)", err)
}
}
// Validate the streamtranslator metrics; should have one 500 failure each loop.
metricNames := []string{"apiserver_stream_translator_requests_total"}
expected := `
# HELP apiserver_stream_translator_requests_total [ALPHA] Total number of requests that were handled by the StreamTranslatorProxy, which processes streaming RemoteCommand/V5
# TYPE apiserver_stream_translator_requests_total counter
apiserver_stream_translator_requests_total{code="500"} 1
`
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expected), metricNames...); err != nil {
t.Fatal(err)
}
metrics.ResetForTest() // Clear metrics each loop
}
}

1
vendor/modules.txt vendored
View File

@ -1525,6 +1525,7 @@ k8s.io/apiserver/pkg/util/openapi
k8s.io/apiserver/pkg/util/peerproxy
k8s.io/apiserver/pkg/util/peerproxy/metrics
k8s.io/apiserver/pkg/util/proxy
k8s.io/apiserver/pkg/util/proxy/metrics
k8s.io/apiserver/pkg/util/shufflesharding
k8s.io/apiserver/pkg/util/webhook
k8s.io/apiserver/pkg/util/x509metrics