Merge pull request #108396 from tkashem/retry-metric

client-go: add a metric to count request retries
This commit is contained in:
Kubernetes Prow Robot 2023-02-06 15:30:58 -08:00 committed by GitHub
commit 2ab3151936
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 179 additions and 17 deletions

View File

@ -726,7 +726,6 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
} }
resp, err := client.Do(req) resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
retry.After(ctx, r, resp, err) retry.After(ctx, r, resp, err)
if err == nil && resp.StatusCode == http.StatusOK { if err == nil && resp.StatusCode == http.StatusOK {
return r.newStreamWatcher(resp) return r.newStreamWatcher(resp)
@ -786,22 +785,36 @@ func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error)
), nil ), nil
} }
// updateURLMetrics is a convenience function for pushing metrics. // updateRequestResultMetric increments the RequestResult metric counter,
// It also handles corner cases for incomplete/invalid request data. // it should be called with the (response, err) tuple from the final
func updateURLMetrics(ctx context.Context, req *Request, resp *http.Response, err error) { // reply from the server.
url := "none" func updateRequestResultMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
code, host := sanitize(req, resp, err)
metrics.RequestResult.Increment(ctx, code, req.verb, host)
}
// updateRequestRetryMetric increments the RequestRetry metric counter,
// it should be called with the (response, err) tuple for each retry
// except for the final attempt.
func updateRequestRetryMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
code, host := sanitize(req, resp, err)
metrics.RequestRetry.IncrementRetry(ctx, code, req.verb, host)
}
func sanitize(req *Request, resp *http.Response, err error) (string, string) {
host := "none"
if req.c.base != nil { if req.c.base != nil {
url = req.c.base.Host host = req.c.base.Host
} }
// Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric // Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
// system so we just report them as `<error>`. // system so we just report them as `<error>`.
if err != nil { code := "<error>"
metrics.RequestResult.Increment(ctx, "<error>", req.verb, url) if resp != nil {
} else { code = strconv.Itoa(resp.StatusCode)
// Metrics for failure codes
metrics.RequestResult.Increment(ctx, strconv.Itoa(resp.StatusCode), req.verb, url)
} }
return code, host
} }
// Stream formats and executes the request, and offers streaming of the response. // Stream formats and executes the request, and offers streaming of the response.
@ -834,7 +847,6 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
return nil, err return nil, err
} }
resp, err := client.Do(req) resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
retry.After(ctx, r, resp, err) retry.After(ctx, r, resp, err)
if err != nil { if err != nil {
// we only retry on an HTTP response with 'Retry-After' header // we only retry on an HTTP response with 'Retry-After' header
@ -979,7 +991,6 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
return err return err
} }
resp, err := client.Do(req) resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown. // The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
// https://pkg.go.dev/net/http#Request // https://pkg.go.dev/net/http#Request
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) { if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {

View File

@ -2991,6 +2991,7 @@ type withRateLimiterBackoffManagerAndMetrics struct {
metrics.ResultMetric metrics.ResultMetric
calculateBackoffSeq int64 calculateBackoffSeq int64
calculateBackoffFn func(i int64) time.Duration calculateBackoffFn func(i int64) time.Duration
metrics.RetryMetric
invokeOrderGot []string invokeOrderGot []string
sleepsGot []string sleepsGot []string
@ -3027,6 +3028,14 @@ func (lb *withRateLimiterBackoffManagerAndMetrics) Increment(ctx context.Context
} }
} }
func (lb *withRateLimiterBackoffManagerAndMetrics) IncrementRetry(ctx context.Context, code, _, _ string) {
// we are interested in the request context that is marked by this test
if marked, ok := ctx.Value(retryTestKey).(bool); ok && marked {
lb.invokeOrderGot = append(lb.invokeOrderGot, "RequestRetry.IncrementRetry")
lb.statusCodesGot = append(lb.statusCodesGot, code)
}
}
func (lb *withRateLimiterBackoffManagerAndMetrics) Do() { func (lb *withRateLimiterBackoffManagerAndMetrics) Do() {
lb.invokeOrderGot = append(lb.invokeOrderGot, "Client.Do") lb.invokeOrderGot = append(lb.invokeOrderGot, "Client.Do")
} }
@ -3072,13 +3081,17 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
"Client.Do", "Client.Do",
// it's a success, so do the following: // it's a success, so do the following:
// - call metrics and update backoff parameters // count the result metric, and since it's a retry,
// count the retry metric, and then update backoff parameters.
"RequestResult.Increment", "RequestResult.Increment",
"RequestRetry.IncrementRetry",
"BackoffManager.UpdateBackoff", "BackoffManager.UpdateBackoff",
} }
statusCodesWant := []string{ statusCodesWant := []string{
// first attempt (A): we count the result metric only
"500", "500",
"200", // final attempt (B): we count the result metric, and the retry metric
"200", "200",
} }
tests := []struct { tests := []struct {
@ -3192,10 +3205,13 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc
// to override as well, and we want tests to be able to run in // to override as well, and we want tests to be able to run in
// parallel then we will need to provide a way for tests to // parallel then we will need to provide a way for tests to
// register/deregister their own metric inerfaces. // register/deregister their own metric inerfaces.
old := metrics.RequestResult oldRequestResult := metrics.RequestResult
oldRequestRetry := metrics.RequestRetry
metrics.RequestResult = interceptor metrics.RequestResult = interceptor
metrics.RequestRetry = interceptor
defer func() { defer func() {
metrics.RequestResult = old metrics.RequestResult = oldRequestResult
metrics.RequestRetry = oldRequestRetry
}() }()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())

View File

@ -242,8 +242,20 @@ func (r *withRetry) After(ctx context.Context, request *Request, resp *http.Resp
// parameters calculated from the (response, err) tuple from // parameters calculated from the (response, err) tuple from
// attempt N-1, so r.retryAfter is outdated and should not be // attempt N-1, so r.retryAfter is outdated and should not be
// referred to here. // referred to here.
isRetry := r.retryAfter != nil
r.retryAfter = nil r.retryAfter = nil
// the client finishes a single request after N attempts (1..N)
// - all attempts (1..N) are counted to the rest_client_requests_total
// metric (current behavior).
// - every attempt after the first (2..N) are counted to the
// rest_client_request_retries_total metric.
updateRequestResultMetric(ctx, request, resp, err)
if isRetry {
// this is attempt 2 or later
updateRequestRetryMetric(ctx, request, resp, err)
}
if request.c.base != nil { if request.c.base != nil {
if err != nil { if err != nil {
request.backoff.UpdateBackoff(request.URL(), err, 0) request.backoff.UpdateBackoff(request.URL(), err, 0)

View File

@ -58,6 +58,12 @@ type CallsMetric interface {
Increment(exitCode int, callStatus string) Increment(exitCode int, callStatus string)
} }
// RetryMetric counts the number of retries sent to the server
// partitioned by code, method, and host.
type RetryMetric interface {
IncrementRetry(ctx context.Context, code string, method string, host string)
}
var ( var (
// ClientCertExpiry is the expiry time of a client certificate // ClientCertExpiry is the expiry time of a client certificate
ClientCertExpiry ExpiryMetric = noopExpiry{} ClientCertExpiry ExpiryMetric = noopExpiry{}
@ -76,6 +82,9 @@ var (
// ExecPluginCalls is the number of calls made to an exec plugin, partitioned by // ExecPluginCalls is the number of calls made to an exec plugin, partitioned by
// exit code and call status. // exit code and call status.
ExecPluginCalls CallsMetric = noopCalls{} ExecPluginCalls CallsMetric = noopCalls{}
// RequestRetry is the retry metric that tracks the number of
// retries sent to the server.
RequestRetry RetryMetric = noopRetry{}
) )
// RegisterOpts contains all the metrics to register. Metrics may be nil. // RegisterOpts contains all the metrics to register. Metrics may be nil.
@ -88,6 +97,7 @@ type RegisterOpts struct {
RateLimiterLatency LatencyMetric RateLimiterLatency LatencyMetric
RequestResult ResultMetric RequestResult ResultMetric
ExecPluginCalls CallsMetric ExecPluginCalls CallsMetric
RequestRetry RetryMetric
} }
// Register registers metrics for the rest client to use. This can // Register registers metrics for the rest client to use. This can
@ -118,6 +128,9 @@ func Register(opts RegisterOpts) {
if opts.ExecPluginCalls != nil { if opts.ExecPluginCalls != nil {
ExecPluginCalls = opts.ExecPluginCalls ExecPluginCalls = opts.ExecPluginCalls
} }
if opts.RequestRetry != nil {
RequestRetry = opts.RequestRetry
}
}) })
} }
@ -144,3 +157,7 @@ func (noopResult) Increment(context.Context, string, string, string) {}
type noopCalls struct{} type noopCalls struct{}
func (noopCalls) Increment(int, string) {} func (noopCalls) Increment(int, string) {}
type noopRetry struct{}
func (noopRetry) IncrementRetry(context.Context, string, string, string) {}

View File

@ -82,6 +82,15 @@ var (
[]string{"code", "verb", "host"}, []string{"code", "verb", "host"},
) )
requestRetry = k8smetrics.NewCounterVec(
&k8smetrics.CounterOpts{
Name: "rest_client_request_retries_total",
StabilityLevel: k8smetrics.ALPHA,
Help: "Number of request retries, partitioned by status code, verb, and host.",
},
[]string{"code", "verb", "host"},
)
execPluginCertTTLAdapter = &expiryToTTLAdapter{} execPluginCertTTLAdapter = &expiryToTTLAdapter{}
execPluginCertTTL = k8smetrics.NewGaugeFunc( execPluginCertTTL = k8smetrics.NewGaugeFunc(
@ -152,6 +161,7 @@ func init() {
legacyregistry.MustRegister(responseSize) legacyregistry.MustRegister(responseSize)
legacyregistry.MustRegister(rateLimiterLatency) legacyregistry.MustRegister(rateLimiterLatency)
legacyregistry.MustRegister(requestResult) legacyregistry.MustRegister(requestResult)
legacyregistry.MustRegister(requestRetry)
legacyregistry.RawMustRegister(execPluginCertTTL) legacyregistry.RawMustRegister(execPluginCertTTL)
legacyregistry.MustRegister(execPluginCertRotation) legacyregistry.MustRegister(execPluginCertRotation)
metrics.Register(metrics.RegisterOpts{ metrics.Register(metrics.RegisterOpts{
@ -162,6 +172,7 @@ func init() {
ResponseSize: &sizeAdapter{m: responseSize}, ResponseSize: &sizeAdapter{m: responseSize},
RateLimiterLatency: &latencyAdapter{m: rateLimiterLatency}, RateLimiterLatency: &latencyAdapter{m: rateLimiterLatency},
RequestResult: &resultAdapter{requestResult}, RequestResult: &resultAdapter{requestResult},
RequestRetry: &retryAdapter{requestRetry},
ExecPluginCalls: &callsAdapter{m: execPluginCalls}, ExecPluginCalls: &callsAdapter{m: execPluginCalls},
}) })
} }
@ -213,3 +224,11 @@ type callsAdapter struct {
func (r *callsAdapter) Increment(code int, callStatus string) { func (r *callsAdapter) Increment(code int, callStatus string) {
r.m.WithLabelValues(fmt.Sprintf("%d", code), callStatus).Inc() r.m.WithLabelValues(fmt.Sprintf("%d", code), callStatus).Inc()
} }
type retryAdapter struct {
m *k8smetrics.CounterVec
}
func (r *retryAdapter) IncrementRetry(ctx context.Context, code, method, host string) {
r.m.WithContext(ctx).WithLabelValues(code, method, host).Inc()
}

View File

@ -0,0 +1,87 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package restclient
import (
"context"
"strings"
"testing"
"k8s.io/client-go/tools/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
)
func TestClientGOMetrics(t *testing.T) {
tests := []struct {
description string
name string
metric interface{}
update func()
want string
}{
{
description: "Number of HTTP requests, partitioned by status code, verb, and host.",
name: "rest_client_requests_total",
metric: requestResult,
update: func() {
metrics.RequestResult.Increment(context.TODO(), "200", "POST", "www.foo.com")
},
want: `
# HELP rest_client_requests_total [ALPHA] Number of HTTP requests, partitioned by status code, verb, and host.
# TYPE rest_client_requests_total counter
rest_client_requests_total{code="200",host="www.foo.com",verb="POST"} 1
`,
},
{
description: "Number of request retries, partitioned by status code, verb, and host.",
name: "rest_client_request_retries_total",
metric: requestRetry,
update: func() {
metrics.RequestRetry.IncrementRetry(context.TODO(), "500", "GET", "www.bar.com")
},
want: `
# HELP rest_client_request_retries_total [ALPHA] Number of request retries, partitioned by status code, verb, and host.
# TYPE rest_client_request_retries_total counter
rest_client_request_retries_total{code="500",host="www.bar.com",verb="GET"} 1
`,
},
}
// no need to register the metrics here, since the init function of
// the package registers all the client-go metrics.
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
resetter, resettable := test.metric.(interface {
Reset()
})
if !resettable {
t.Fatalf("the metric must be resettaable: %s", test.name)
}
// Since prometheus' gatherer is global, other tests may have updated
// metrics already, so we need to reset them prior to running this test.
// This also implies that we can't run this test in parallel with other tests.
resetter.Reset()
test.update()
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(test.want), test.name); err != nil {
t.Fatal(err)
}
})
}
}