Merge pull request #117295 from aojea/transport_cache_metrics

add new metric for the internal client-go cache size
This commit is contained in:
Kubernetes Prow Robot 2023-05-11 08:59:02 -07:00 committed by GitHub
commit 64af2d93e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 279 additions and 5 deletions

View File

@ -20,7 +20,7 @@ import (
"os"
"k8s.io/component-base/cli"
_ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration
_ "k8s.io/component-base/metrics/prometheus/clientgo" // for client metric registration
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
"k8s.io/kubernetes/cmd/kube-proxy/app"
)

View File

@ -26,7 +26,7 @@ import (
"k8s.io/component-base/cli"
_ "k8s.io/component-base/logs/json/register" // for JSON log format registration
_ "k8s.io/component-base/metrics/prometheus/restclient"
_ "k8s.io/component-base/metrics/prometheus/clientgo" // for client metric registration
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
"k8s.io/kubernetes/cmd/kubelet/app"
)

View File

@ -64,6 +64,17 @@ type RetryMetric interface {
IncrementRetry(ctx context.Context, code string, method string, host string)
}
// TransportCacheMetric shows the number of entries in the internal transport cache
type TransportCacheMetric interface {
Observe(value int)
}
// TransportCreateCallsMetric counts the number of times a transport is created
// partitioned by the result of the cache: hit, miss, uncacheable
type TransportCreateCallsMetric interface {
Increment(result string)
}
var (
// ClientCertExpiry is the expiry time of a client certificate
ClientCertExpiry ExpiryMetric = noopExpiry{}
@ -85,6 +96,12 @@ var (
// RequestRetry is the retry metric that tracks the number of
// retries sent to the server.
RequestRetry RetryMetric = noopRetry{}
// TransportCacheEntries is the metric that tracks the number of entries in the
// internal transport cache.
TransportCacheEntries TransportCacheMetric = noopTransportCache{}
// TransportCreateCalls is the metric that counts the number of times a new transport
// is created
TransportCreateCalls TransportCreateCallsMetric = noopTransportCreateCalls{}
)
// RegisterOpts contains all the metrics to register. Metrics may be nil.
@ -98,6 +115,8 @@ type RegisterOpts struct {
RequestResult ResultMetric
ExecPluginCalls CallsMetric
RequestRetry RetryMetric
TransportCacheEntries TransportCacheMetric
TransportCreateCalls TransportCreateCallsMetric
}
// Register registers metrics for the rest client to use. This can
@ -131,6 +150,12 @@ func Register(opts RegisterOpts) {
if opts.RequestRetry != nil {
RequestRetry = opts.RequestRetry
}
if opts.TransportCacheEntries != nil {
TransportCacheEntries = opts.TransportCacheEntries
}
if opts.TransportCreateCalls != nil {
TransportCreateCalls = opts.TransportCreateCalls
}
})
}
@ -161,3 +186,11 @@ func (noopCalls) Increment(int, string) {}
type noopRetry struct{}
func (noopRetry) IncrementRetry(context.Context, string, string, string) {}
type noopTransportCache struct{}
func (noopTransportCache) Observe(int) {}
type noopTransportCreateCalls struct{}
func (noopTransportCreateCalls) Increment(string) {}

View File

@ -27,6 +27,7 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/metrics"
)
// TlsTransportCache caches TLS http.RoundTrippers different configurations. The
@ -80,11 +81,16 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
// Ensure we only create a single transport for the given TLS options
c.mu.Lock()
defer c.mu.Unlock()
defer metrics.TransportCacheEntries.Observe(len(c.transports))
// See if we already have a custom transport for this config
if t, ok := c.transports[key]; ok {
metrics.TransportCreateCalls.Increment("hit")
return t, nil
}
metrics.TransportCreateCalls.Increment("miss")
} else {
metrics.TransportCreateCalls.Increment("uncacheable")
}
// Get the TLS options for this client config

View File

@ -152,6 +152,24 @@ var (
},
[]string{"code", "call_status"},
)
transportCacheEntries = k8smetrics.NewGauge(
&k8smetrics.GaugeOpts{
Name: "rest_client_transport_cache_entries",
StabilityLevel: k8smetrics.ALPHA,
Help: "Number of transport entries in the internal cache.",
},
)
transportCacheCalls = k8smetrics.NewCounterVec(
&k8smetrics.CounterOpts{
Name: "rest_client_transport_create_calls_total",
StabilityLevel: k8smetrics.ALPHA,
Help: "Number of calls to get a new transport, partitioned by the result of the operation " +
"hit: obtained from the cache, miss: created and added to the cache, uncacheable: created and not cached",
},
[]string{"result"},
)
)
func init() {
@ -164,6 +182,9 @@ func init() {
legacyregistry.MustRegister(requestRetry)
legacyregistry.RawMustRegister(execPluginCertTTL)
legacyregistry.MustRegister(execPluginCertRotation)
legacyregistry.MustRegister(execPluginCalls)
legacyregistry.MustRegister(transportCacheEntries)
legacyregistry.MustRegister(transportCacheCalls)
metrics.Register(metrics.RegisterOpts{
ClientCertExpiry: execPluginCertTTLAdapter,
ClientCertRotationAge: &rotationAdapter{m: execPluginCertRotation},
@ -174,6 +195,8 @@ func init() {
RequestResult: &resultAdapter{requestResult},
RequestRetry: &retryAdapter{requestRetry},
ExecPluginCalls: &callsAdapter{m: execPluginCalls},
TransportCacheEntries: &transportCacheAdapter{m: transportCacheEntries},
TransportCreateCalls: &transportCacheCallsAdapter{m: transportCacheCalls},
})
}
@ -232,3 +255,19 @@ type retryAdapter struct {
func (r *retryAdapter) IncrementRetry(ctx context.Context, code, method, host string) {
r.m.WithContext(ctx).WithLabelValues(code, method, host).Inc()
}
type transportCacheAdapter struct {
m *k8smetrics.Gauge
}
func (t *transportCacheAdapter) Observe(value int) {
t.m.Set(float64(value))
}
type transportCacheCallsAdapter struct {
m *k8smetrics.CounterVec
}
func (t *transportCacheCallsAdapter) Increment(result string) {
t.m.WithLabelValues(result).Inc()
}

View File

@ -0,0 +1,27 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}

View File

@ -0,0 +1,169 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"context"
"fmt"
"strconv"
"strings"
"testing"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/legacyregistry"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
// the metrics are loaded on cmd/kube-apiserver/apiserver.go
// so we need to load them here to be available for the test
_ "k8s.io/component-base/metrics/prometheus/restclient"
)
// IMPORTANT: metrics are stored globally so all the test must run serially
// and reset the metrics.
// regression test for https://issues.k8s.io/117258
func TestAPIServerTransportMetrics(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, "AllAlpha", true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, "AllBeta", true)()
// reset default registry metrics
legacyregistry.Reset()
result := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd())
defer result.TearDownFn()
client := clientset.NewForConfigOrDie(result.ClientConfig)
// IMPORTANT: reflect the current values if the test changes
// client_test.go:1407: metric rest_client_transport_cache_entries 3
// client_test.go:1407: metric rest_client_transport_create_calls_total{result="hit"} 61
// client_test.go:1407: metric rest_client_transport_create_calls_total{result="miss"} 3
hits1, misses1, entries1 := checkTransportMetrics(t, client)
// hit ratio at startup depends on multiple factors
if (hits1*100)/(hits1+misses1) < 90 {
t.Fatalf("transport cache hit ratio %d lower than 90 percent", (hits1*100)/(hits1+misses1))
}
aggregatorClient := aggregatorclient.NewForConfigOrDie(result.ClientConfig)
aggregatedAPI := &apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.example.com"},
Spec: apiregistrationv1.APIServiceSpec{
Service: &apiregistrationv1.ServiceReference{
Namespace: "kube-wardle",
Name: "api",
},
Group: "wardle.example.com",
Version: "v1alpha1",
GroupPriorityMinimum: 200,
VersionPriority: 200,
},
}
_, err := aggregatorClient.ApiregistrationV1().APIServices().Create(context.Background(), aggregatedAPI, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
requests := 30
errors := 0
for i := 0; i < requests; i++ {
apiService, err := aggregatorClient.ApiregistrationV1().APIServices().Get(context.Background(), "v1alpha1.wardle.example.com", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
// mutate the object
apiService.Labels = map[string]string{"key": fmt.Sprintf("val%d", i)}
_, err = aggregatorClient.ApiregistrationV1().APIServices().Update(context.Background(), apiService, metav1.UpdateOptions{})
if err != nil && !apierrors.IsConflict(err) {
t.Logf("unexpected error: %v", err)
errors++
}
}
if (errors*100)/requests > 20 {
t.Fatalf("high number of errors during the test %d out of %d", errors, requests)
}
// IMPORTANT: reflect the current values if the test changes
// client_test.go:1407: metric rest_client_transport_cache_entries 4
// client_test.go:1407: metric rest_client_transport_create_calls_total{result="hit"} 120
// client_test.go:1407: metric rest_client_transport_create_calls_total{result="miss"} 4
hits2, misses2, entries2 := checkTransportMetrics(t, client)
if entries2-entries1 > 10 {
t.Fatalf("possible transport leak, number of new cache entries increased by %d", entries2-entries1)
}
// hit ratio after startup should grow since no new transports are expected
if (hits2*100)/(hits2+misses2) < 95 {
t.Fatalf("transport cache hit ratio %d lower than 95 percent", (hits2*100)/(hits2+misses2))
}
}
func checkTransportMetrics(t *testing.T, client *clientset.Clientset) (hits int, misses int, entries int) {
t.Helper()
body, err := client.RESTClient().Get().AbsPath("/metrics").DoRaw(context.Background())
if err != nil {
t.Fatal(err)
}
// TODO: this can be much better if there is some library that parse prometheus metrics
// the existing one in "k8s.io/component-base/metrics/testutil" uses the global variable
// but we want to parse the ones returned by the endpoint to be sure the metrics are
// exposed correctly
for _, line := range strings.Split(string(body), "\n") {
if !strings.HasPrefix(line, "rest_client_transport") {
continue
}
if strings.Contains(line, "uncacheable") {
t.Fatalf("detected transport that is not cacheable, please check https://issues.k8s.io/112017")
}
output := strings.Split(line, " ")
if len(output) != 2 {
t.Fatalf("expected metrics to be in the format name value, got %v", output)
}
name := output[0]
value, err := strconv.Atoi(output[1])
if err != nil {
t.Fatalf("metric value can not be converted to integer %v", err)
}
switch name {
case "rest_client_transport_cache_entries":
entries = value
case `rest_client_transport_create_calls_total{result="hit"}`:
hits = value
case `rest_client_transport_create_calls_total{result="miss"}`:
misses = value
}
t.Logf("metric %s", line)
}
if misses != entries || misses == 0 {
t.Errorf("expected as many entries %d in the cache as misses, got %d", entries, misses)
}
if hits < misses {
t.Errorf("expected more hits %d in the cache than misses %d", hits, misses)
}
return
}