Merge pull request #104983 from MikeSpreitzer/list-metrics-take3

Try yet again to add metrics about LIST handling
This commit is contained in:
Kubernetes Prow Robot 2021-09-22 07:16:02 -07:00 committed by GitHub
commit 5b489e2846
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 191 additions and 28 deletions

View File

@ -19,6 +19,7 @@ package routes
import ( import (
apimetrics "k8s.io/apiserver/pkg/endpoints/metrics" apimetrics "k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/server/mux" "k8s.io/apiserver/pkg/server/mux"
cachermetrics "k8s.io/apiserver/pkg/storage/cacher/metrics"
etcd3metrics "k8s.io/apiserver/pkg/storage/etcd3/metrics" etcd3metrics "k8s.io/apiserver/pkg/storage/etcd3/metrics"
flowcontrolmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" flowcontrolmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
@ -46,6 +47,7 @@ func (m MetricsWithReset) Install(c *mux.PathRecorderMux) {
// register apiserver and etcd metrics // register apiserver and etcd metrics
func register() { func register() {
apimetrics.Register() apimetrics.Register()
cachermetrics.Register()
etcd3metrics.Register() etcd3metrics.Register()
flowcontrolmetrics.Register() flowcontrolmetrics.Register()
} }

View File

@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -191,7 +192,7 @@ func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
if bucketID < t.startBucketID { if bucketID < t.startBucketID {
bucketID = t.startBucketID bucketID = t.startBucketID
} }
watchers, _ := t.watchersBuckets[bucketID] watchers := t.watchersBuckets[bucketID]
t.watchersBuckets[bucketID] = append(watchers, w) t.watchersBuckets[bucketID] = append(watchers, w)
return true return true
} }
@ -231,6 +232,8 @@ type Cacher struct {
// Incoming events that should be dispatched to watchers. // Incoming events that should be dispatched to watchers.
incoming chan watchCacheEvent incoming chan watchCacheEvent
resourcePrefix string
sync.RWMutex sync.RWMutex
// Before accessing the cacher's cache, wait for the ready to be ok. // Before accessing the cacher's cache, wait for the ready to be ok.
@ -329,6 +332,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
} }
objType := reflect.TypeOf(obj) objType := reflect.TypeOf(obj)
cacher := &Cacher{ cacher := &Cacher{
resourcePrefix: config.ResourcePrefix,
ready: newReady(), ready: newReady(),
storage: config.Storage, storage: config.Storage,
objectType: objType, objectType: objType,
@ -718,7 +722,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions,
} }
filter := filterWithAttrsFunction(key, pred) filter := filterWithAttrsFunction(key, pred)
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace) objs, readResourceVersion, indexUsed, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
if err != nil { if err != nil {
return err return err
} }
@ -744,6 +748,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions,
return err return err
} }
} }
metrics.RecordListCacheMetrics(c.resourcePrefix, indexUsed, len(objs), listVal.Len())
return nil return nil
} }

View File

@ -0,0 +1,8 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- sig-instrumentation-approvers
reviewers:
- sig-instrumentation-reviewers
labels:
- sig/instrumentation

View File

@ -0,0 +1,78 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
/*
* By default, all the following metrics are defined as falling under
* ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/1209-metrics-stability/kubernetes-control-plane-metrics-stability.md#stability-classes)
*
* Promoting the stability level of the metric is a responsibility of the component owner, since it
* involves explicitly acknowledging support for the metric across multiple releases, in accordance with
* the metric stability policy.
*/
var (
listCacheCount = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Name: "apiserver_cache_list_total",
Help: "Number of LIST requests served from watch cache",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource_prefix", "index"},
)
listCacheNumFetched = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Name: "apiserver_cache_list_fetched_objects_total",
Help: "Number of objects read from watch cache in the course of serving a LIST request",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource_prefix", "index"},
)
listCacheNumReturned = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Name: "apiserver_cache_list_returned_objects_total",
Help: "Number of objects returned for a LIST request from watch cache",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource_prefix"},
)
)
var registerMetrics sync.Once
// Register all metrics.
func Register() {
// Register the metrics.
registerMetrics.Do(func() {
legacyregistry.MustRegister(listCacheCount)
legacyregistry.MustRegister(listCacheNumFetched)
legacyregistry.MustRegister(listCacheNumReturned)
})
}
// RecordListCacheMetrics notes various metrics of the cost to serve a LIST request
func RecordListCacheMetrics(resourcePrefix, indexName string, numFetched, numReturned int) {
listCacheCount.WithLabelValues(resourcePrefix, indexName).Inc()
listCacheNumFetched.WithLabelValues(resourcePrefix, indexName).Add(float64(numFetched))
listCacheNumReturned.WithLabelValues(resourcePrefix).Add(float64(numReturned))
}

View File

@ -449,12 +449,13 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utilt
return nil return nil
} }
// WaitUntilFreshAndList returns list of pointers to <storeElement> objects. // WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues []storage.MatchValue, trace *utiltrace.Trace) ([]interface{}, uint64, error) { // with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues []storage.MatchValue, trace *utiltrace.Trace) ([]interface{}, uint64, string, error) {
err := w.waitUntilFreshAndBlock(resourceVersion, trace) err := w.waitUntilFreshAndBlock(resourceVersion, trace)
defer w.RUnlock() defer w.RUnlock()
if err != nil { if err != nil {
return nil, 0, err return nil, 0, "", err
} }
// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only // This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
@ -463,10 +464,10 @@ func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues [
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible. // TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
for _, matchValue := range matchValues { for _, matchValue := range matchValues {
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
return result, w.resourceVersion, nil return result, w.resourceVersion, matchValue.IndexName, nil
} }
} }
return w.store.List(), w.resourceVersion, nil return w.store.List(), w.resourceVersion, "", nil
} }
// WaitUntilFreshAndGet returns a pointers to <storeElement> object. // WaitUntilFreshAndGet returns a pointers to <storeElement> object.

View File

@ -24,7 +24,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -357,7 +357,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
}() }()
// list by empty MatchValues. // list by empty MatchValues.
list, resourceVersion, err := store.WaitUntilFreshAndList(5, nil, nil) list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(5, nil, nil)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -367,13 +367,16 @@ func TestWaitUntilFreshAndList(t *testing.T) {
if len(list) != 3 { if len(list) != 3 {
t.Errorf("unexpected list returned: %#v", list) t.Errorf("unexpected list returned: %#v", list)
} }
if indexUsed != "" {
t.Errorf("Used index %q but expected none to be used", indexUsed)
}
// list by label index. // list by label index.
matchValues := []storage.MatchValue{ matchValues := []storage.MatchValue{
{IndexName: "l:label", Value: "value1"}, {IndexName: "l:label", Value: "value1"},
{IndexName: "f:spec.nodeName", Value: "node2"}, {IndexName: "f:spec.nodeName", Value: "node2"},
} }
list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil) list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(5, matchValues, nil)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -383,13 +386,16 @@ func TestWaitUntilFreshAndList(t *testing.T) {
if len(list) != 2 { if len(list) != 2 {
t.Errorf("unexpected list returned: %#v", list) t.Errorf("unexpected list returned: %#v", list)
} }
if indexUsed != "l:label" {
t.Errorf("Used index %q but expected %q", indexUsed, "l:label")
}
// list with spec.nodeName index. // list with spec.nodeName index.
matchValues = []storage.MatchValue{ matchValues = []storage.MatchValue{
{IndexName: "l:not-exist-label", Value: "whatever"}, {IndexName: "l:not-exist-label", Value: "whatever"},
{IndexName: "f:spec.nodeName", Value: "node2"}, {IndexName: "f:spec.nodeName", Value: "node2"},
} }
list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil) list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(5, matchValues, nil)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -399,12 +405,15 @@ func TestWaitUntilFreshAndList(t *testing.T) {
if len(list) != 1 { if len(list) != 1 {
t.Errorf("unexpected list returned: %#v", list) t.Errorf("unexpected list returned: %#v", list)
} }
if indexUsed != "f:spec.nodeName" {
t.Errorf("Used index %q but expected %q", indexUsed, "f:spec.nodeName")
}
// list with index not exists. // list with index not exists.
matchValues = []storage.MatchValue{ matchValues = []storage.MatchValue{
{IndexName: "l:not-exist-label", Value: "whatever"}, {IndexName: "l:not-exist-label", Value: "whatever"},
} }
list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil) list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(5, matchValues, nil)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -414,6 +423,9 @@ func TestWaitUntilFreshAndList(t *testing.T) {
if len(list) != 3 { if len(list) != 3 {
t.Errorf("unexpected list returned: %#v", list) t.Errorf("unexpected list returned: %#v", list)
} }
if indexUsed != "" {
t.Errorf("Used index %q but expected none to be used", indexUsed)
}
} }
func TestWaitUntilFreshAndGet(t *testing.T) { func TestWaitUntilFreshAndGet(t *testing.T) {
@ -459,7 +471,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
store.Add(makeTestPod("bar", 5)) store.Add(makeTestPod("bar", 5))
}() }()
_, _, err := store.WaitUntilFreshAndList(5, nil, nil) _, _, _, err := store.WaitUntilFreshAndList(5, nil, nil)
if !errors.IsTimeout(err) { if !errors.IsTimeout(err) {
t.Errorf("expected timeout error but got: %v", err) t.Errorf("expected timeout error but got: %v", err)
} }
@ -484,7 +496,7 @@ func TestReflectorForWatchCache(t *testing.T) {
store := newTestWatchCache(5, &cache.Indexers{}) store := newTestWatchCache(5, &cache.Indexers{})
{ {
_, version, err := store.WaitUntilFreshAndList(0, nil, nil) _, version, _, err := store.WaitUntilFreshAndList(0, nil, nil)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -507,7 +519,7 @@ func TestReflectorForWatchCache(t *testing.T) {
r.ListAndWatch(wait.NeverStop) r.ListAndWatch(wait.NeverStop)
{ {
_, version, err := store.WaitUntilFreshAndList(10, nil, nil) _, version, _, err := store.WaitUntilFreshAndList(10, nil, nil)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }

View File

@ -85,6 +85,38 @@ var (
}, },
[]string{}, []string{},
) )
listStorageCount = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Name: "apiserver_storage_list_total",
Help: "Number of LIST requests served from storage",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
listStorageNumFetched = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Name: "apiserver_storage_list_fetched_objects_total",
Help: "Number of objects read from storage in the course of serving a LIST request",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
listStorageNumSelectorEvals = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Name: "apiserver_storage_list_evaluated_objects_total",
Help: "Number of objects tested in the course of serving a LIST request from storage",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
listStorageNumReturned = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Name: "apiserver_storage_list_returned_objects_total",
Help: "Number of objects returned for a LIST request from storage",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
) )
var registerMetrics sync.Once var registerMetrics sync.Once
@ -99,6 +131,10 @@ func Register() {
legacyregistry.MustRegister(dbTotalSize) legacyregistry.MustRegister(dbTotalSize)
legacyregistry.MustRegister(etcdBookmarkCounts) legacyregistry.MustRegister(etcdBookmarkCounts)
legacyregistry.MustRegister(etcdLeaseObjectCounts) legacyregistry.MustRegister(etcdLeaseObjectCounts)
legacyregistry.MustRegister(listStorageCount)
legacyregistry.MustRegister(listStorageNumFetched)
legacyregistry.MustRegister(listStorageNumSelectorEvals)
legacyregistry.MustRegister(listStorageNumReturned)
}) })
} }
@ -139,3 +175,11 @@ func UpdateLeaseObjectCount(count int64) {
// See pkg/storage/etcd3/lease_manager.go // See pkg/storage/etcd3/lease_manager.go
etcdLeaseObjectCounts.WithLabelValues().Observe(float64(count)) etcdLeaseObjectCounts.WithLabelValues().Observe(float64(count))
} }
// RecordListEtcd3Metrics notes various metrics of the cost to serve a LIST request
func RecordStorageListMetrics(resource string, numFetched, numEvald, numReturned int) {
listStorageCount.WithLabelValues(resource).Inc()
listStorageNumFetched.WithLabelValues(resource).Add(float64(numFetched))
listStorageNumSelectorEvals.WithLabelValues(resource).Add(float64(numEvald))
listStorageNumReturned.WithLabelValues(resource).Add(float64(numReturned))
}

View File

@ -65,15 +65,16 @@ func (d authenticatedDataString) AuthenticatedData() []byte {
var _ value.Context = authenticatedDataString("") var _ value.Context = authenticatedDataString("")
type store struct { type store struct {
client *clientv3.Client client *clientv3.Client
codec runtime.Codec codec runtime.Codec
versioner storage.Versioner versioner storage.Versioner
transformer value.Transformer transformer value.Transformer
pathPrefix string pathPrefix string
groupResource schema.GroupResource groupResource schema.GroupResource
watcher *watcher groupResourceString string
pagingEnabled bool watcher *watcher
leaseManager *leaseManager pagingEnabled bool
leaseManager *leaseManager
} }
type objState struct { type objState struct {
@ -100,10 +101,11 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob
// for compatibility with etcd2 impl. // for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'. // no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix), pathPrefix: path.Join("/", prefix),
groupResource: groupResource, groupResource: groupResource,
watcher: newWatcher(c, codec, newFunc, versioner, transformer), groupResourceString: groupResource.String(),
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), watcher: newWatcher(c, codec, newFunc, versioner, transformer),
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
} }
return result return result
} }
@ -727,6 +729,14 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
var lastKey []byte var lastKey []byte
var hasMore bool var hasMore bool
var getResp *clientv3.GetResponse var getResp *clientv3.GetResponse
var numFetched int
var numEvald int
// Because these metrics are for understanding the costs of handling LIST requests,
// get them recorded even in error cases.
defer func() {
numReturn := v.Len()
metrics.RecordStorageListMetrics(s.groupResourceString, numFetched, numEvald, numReturn)
}()
for { for {
startTime := time.Now() startTime := time.Now()
getResp, err = s.client.KV.Get(ctx, key, options...) getResp, err = s.client.KV.Get(ctx, key, options...)
@ -734,6 +744,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
if err != nil { if err != nil {
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix) return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
} }
numFetched += len(getResp.Kvs)
if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil { if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err return err
} }
@ -767,6 +778,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil { if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil {
return err return err
} }
numEvald++
} }
// indicate to the client which resource version was returned // indicate to the client which resource version was returned

1
vendor/modules.txt vendored
View File

@ -1534,6 +1534,7 @@ k8s.io/apiserver/pkg/server/routes
k8s.io/apiserver/pkg/server/storage k8s.io/apiserver/pkg/server/storage
k8s.io/apiserver/pkg/storage k8s.io/apiserver/pkg/storage
k8s.io/apiserver/pkg/storage/cacher k8s.io/apiserver/pkg/storage/cacher
k8s.io/apiserver/pkg/storage/cacher/metrics
k8s.io/apiserver/pkg/storage/errors k8s.io/apiserver/pkg/storage/errors
k8s.io/apiserver/pkg/storage/etcd3 k8s.io/apiserver/pkg/storage/etcd3
k8s.io/apiserver/pkg/storage/etcd3/metrics k8s.io/apiserver/pkg/storage/etcd3/metrics