diff --git a/staging/src/k8s.io/apiserver/pkg/server/routes/metrics.go b/staging/src/k8s.io/apiserver/pkg/server/routes/metrics.go index 1121e95c36a..d30f74b9c41 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/routes/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/server/routes/metrics.go @@ -19,6 +19,7 @@ package routes import ( apimetrics "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/server/mux" + cachermetrics "k8s.io/apiserver/pkg/storage/cacher/metrics" etcd3metrics "k8s.io/apiserver/pkg/storage/etcd3/metrics" flowcontrolmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" "k8s.io/component-base/metrics/legacyregistry" @@ -46,6 +47,7 @@ func (m MetricsWithReset) Install(c *mux.PathRecorderMux) { // register apiserver and etcd metrics func register() { apimetrics.Register() + cachermetrics.Register() etcd3metrics.Register() flowcontrolmetrics.Register() } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 2053c818066..a47d92bc5dd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/cacher/metrics" utilfeature "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/client-go/tools/cache" @@ -191,7 +192,7 @@ func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool { if bucketID < t.startBucketID { bucketID = t.startBucketID } - watchers, _ := t.watchersBuckets[bucketID] + watchers := t.watchersBuckets[bucketID] t.watchersBuckets[bucketID] = append(watchers, w) return true } @@ -231,6 +232,8 @@ type Cacher struct { // Incoming events that should be dispatched to watchers. incoming chan watchCacheEvent + resourcePrefix string + sync.RWMutex // Before accessing the cacher's cache, wait for the ready to be ok. @@ -329,6 +332,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } objType := reflect.TypeOf(obj) cacher := &Cacher{ + resourcePrefix: config.ResourcePrefix, ready: newReady(), storage: config.Storage, objectType: objType, @@ -718,7 +722,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, } filter := filterWithAttrsFunction(key, pred) - objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace) + objs, readResourceVersion, indexUsed, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace) if err != nil { return err } @@ -744,6 +748,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, return err } } + metrics.RecordListCacheMetrics(c.resourcePrefix, indexUsed, len(objs), listVal.Len()) return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/OWNERS b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/OWNERS new file mode 100644 index 00000000000..b26e7a4dc7e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/OWNERS @@ -0,0 +1,8 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: + - sig-instrumentation-approvers +reviewers: + - sig-instrumentation-reviewers +labels: + - sig/instrumentation diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go new file mode 100644 index 00000000000..652730332e1 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go @@ -0,0 +1,78 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + + compbasemetrics "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +/* + * By default, all the following metrics are defined as falling under + * ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/1209-metrics-stability/kubernetes-control-plane-metrics-stability.md#stability-classes) + * + * Promoting the stability level of the metric is a responsibility of the component owner, since it + * involves explicitly acknowledging support for the metric across multiple releases, in accordance with + * the metric stability policy. + */ +var ( + listCacheCount = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Name: "apiserver_cache_list_total", + Help: "Number of LIST requests served from watch cache", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"resource_prefix", "index"}, + ) + listCacheNumFetched = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Name: "apiserver_cache_list_fetched_objects_total", + Help: "Number of objects read from watch cache in the course of serving a LIST request", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"resource_prefix", "index"}, + ) + listCacheNumReturned = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Name: "apiserver_cache_list_returned_objects_total", + Help: "Number of objects returned for a LIST request from watch cache", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"resource_prefix"}, + ) +) + +var registerMetrics sync.Once + +// Register all metrics. +func Register() { + // Register the metrics. + registerMetrics.Do(func() { + legacyregistry.MustRegister(listCacheCount) + legacyregistry.MustRegister(listCacheNumFetched) + legacyregistry.MustRegister(listCacheNumReturned) + }) +} + +// RecordListCacheMetrics notes various metrics of the cost to serve a LIST request +func RecordListCacheMetrics(resourcePrefix, indexName string, numFetched, numReturned int) { + listCacheCount.WithLabelValues(resourcePrefix, indexName).Inc() + listCacheNumFetched.WithLabelValues(resourcePrefix, indexName).Add(float64(numFetched)) + listCacheNumReturned.WithLabelValues(resourcePrefix).Add(float64(numReturned)) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 65f002e6fdd..120e91f1091 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -449,12 +449,13 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utilt return nil } -// WaitUntilFreshAndList returns list of pointers to objects. -func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues []storage.MatchValue, trace *utiltrace.Trace) ([]interface{}, uint64, error) { +// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along +// with their ResourceVersion and the name of the index, if any, that was used. +func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues []storage.MatchValue, trace *utiltrace.Trace) ([]interface{}, uint64, string, error) { err := w.waitUntilFreshAndBlock(resourceVersion, trace) defer w.RUnlock() if err != nil { - return nil, 0, err + return nil, 0, "", err } // This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only @@ -463,10 +464,10 @@ func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues [ // TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible. for _, matchValue := range matchValues { if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { - return result, w.resourceVersion, nil + return result, w.resourceVersion, matchValue.IndexName, nil } } - return w.store.List(), w.resourceVersion, nil + return w.store.List(), w.resourceVersion, "", nil } // WaitUntilFreshAndGet returns a pointers to object. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 4812650b762..21c4c573a1b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -24,7 +24,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -357,7 +357,7 @@ func TestWaitUntilFreshAndList(t *testing.T) { }() // list by empty MatchValues. - list, resourceVersion, err := store.WaitUntilFreshAndList(5, nil, nil) + list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(5, nil, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -367,13 +367,16 @@ func TestWaitUntilFreshAndList(t *testing.T) { if len(list) != 3 { t.Errorf("unexpected list returned: %#v", list) } + if indexUsed != "" { + t.Errorf("Used index %q but expected none to be used", indexUsed) + } // list by label index. matchValues := []storage.MatchValue{ {IndexName: "l:label", Value: "value1"}, {IndexName: "f:spec.nodeName", Value: "node2"}, } - list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil) + list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(5, matchValues, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -383,13 +386,16 @@ func TestWaitUntilFreshAndList(t *testing.T) { if len(list) != 2 { t.Errorf("unexpected list returned: %#v", list) } + if indexUsed != "l:label" { + t.Errorf("Used index %q but expected %q", indexUsed, "l:label") + } // list with spec.nodeName index. matchValues = []storage.MatchValue{ {IndexName: "l:not-exist-label", Value: "whatever"}, {IndexName: "f:spec.nodeName", Value: "node2"}, } - list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil) + list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(5, matchValues, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -399,12 +405,15 @@ func TestWaitUntilFreshAndList(t *testing.T) { if len(list) != 1 { t.Errorf("unexpected list returned: %#v", list) } + if indexUsed != "f:spec.nodeName" { + t.Errorf("Used index %q but expected %q", indexUsed, "f:spec.nodeName") + } // list with index not exists. matchValues = []storage.MatchValue{ {IndexName: "l:not-exist-label", Value: "whatever"}, } - list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil) + list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(5, matchValues, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -414,6 +423,9 @@ func TestWaitUntilFreshAndList(t *testing.T) { if len(list) != 3 { t.Errorf("unexpected list returned: %#v", list) } + if indexUsed != "" { + t.Errorf("Used index %q but expected none to be used", indexUsed) + } } func TestWaitUntilFreshAndGet(t *testing.T) { @@ -459,7 +471,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) { store.Add(makeTestPod("bar", 5)) }() - _, _, err := store.WaitUntilFreshAndList(5, nil, nil) + _, _, _, err := store.WaitUntilFreshAndList(5, nil, nil) if !errors.IsTimeout(err) { t.Errorf("expected timeout error but got: %v", err) } @@ -484,7 +496,7 @@ func TestReflectorForWatchCache(t *testing.T) { store := newTestWatchCache(5, &cache.Indexers{}) { - _, version, err := store.WaitUntilFreshAndList(0, nil, nil) + _, version, _, err := store.WaitUntilFreshAndList(0, nil, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -507,7 +519,7 @@ func TestReflectorForWatchCache(t *testing.T) { r.ListAndWatch(wait.NeverStop) { - _, version, err := store.WaitUntilFreshAndList(10, nil, nil) + _, version, _, err := store.WaitUntilFreshAndList(10, nil, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go index c10eb273e67..dc9917cef91 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go @@ -85,6 +85,38 @@ var ( }, []string{}, ) + listStorageCount = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Name: "apiserver_storage_list_total", + Help: "Number of LIST requests served from storage", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"resource"}, + ) + listStorageNumFetched = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Name: "apiserver_storage_list_fetched_objects_total", + Help: "Number of objects read from storage in the course of serving a LIST request", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"resource"}, + ) + listStorageNumSelectorEvals = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Name: "apiserver_storage_list_evaluated_objects_total", + Help: "Number of objects tested in the course of serving a LIST request from storage", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"resource"}, + ) + listStorageNumReturned = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Name: "apiserver_storage_list_returned_objects_total", + Help: "Number of objects returned for a LIST request from storage", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"resource"}, + ) ) var registerMetrics sync.Once @@ -99,6 +131,10 @@ func Register() { legacyregistry.MustRegister(dbTotalSize) legacyregistry.MustRegister(etcdBookmarkCounts) legacyregistry.MustRegister(etcdLeaseObjectCounts) + legacyregistry.MustRegister(listStorageCount) + legacyregistry.MustRegister(listStorageNumFetched) + legacyregistry.MustRegister(listStorageNumSelectorEvals) + legacyregistry.MustRegister(listStorageNumReturned) }) } @@ -139,3 +175,11 @@ func UpdateLeaseObjectCount(count int64) { // See pkg/storage/etcd3/lease_manager.go etcdLeaseObjectCounts.WithLabelValues().Observe(float64(count)) } + +// RecordListEtcd3Metrics notes various metrics of the cost to serve a LIST request +func RecordStorageListMetrics(resource string, numFetched, numEvald, numReturned int) { + listStorageCount.WithLabelValues(resource).Inc() + listStorageNumFetched.WithLabelValues(resource).Add(float64(numFetched)) + listStorageNumSelectorEvals.WithLabelValues(resource).Add(float64(numEvald)) + listStorageNumReturned.WithLabelValues(resource).Add(float64(numReturned)) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 2129c333d97..c707fa75939 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -65,15 +65,16 @@ func (d authenticatedDataString) AuthenticatedData() []byte { var _ value.Context = authenticatedDataString("") type store struct { - client *clientv3.Client - codec runtime.Codec - versioner storage.Versioner - transformer value.Transformer - pathPrefix string - groupResource schema.GroupResource - watcher *watcher - pagingEnabled bool - leaseManager *leaseManager + client *clientv3.Client + codec runtime.Codec + versioner storage.Versioner + transformer value.Transformer + pathPrefix string + groupResource schema.GroupResource + groupResourceString string + watcher *watcher + pagingEnabled bool + leaseManager *leaseManager } type objState struct { @@ -100,10 +101,11 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob // for compatibility with etcd2 impl. // no-op for default prefix of '/registry'. // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' - pathPrefix: path.Join("/", prefix), - groupResource: groupResource, - watcher: newWatcher(c, codec, newFunc, versioner, transformer), - leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), + pathPrefix: path.Join("/", prefix), + groupResource: groupResource, + groupResourceString: groupResource.String(), + watcher: newWatcher(c, codec, newFunc, versioner, transformer), + leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), } return result } @@ -727,6 +729,14 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, var lastKey []byte var hasMore bool var getResp *clientv3.GetResponse + var numFetched int + var numEvald int + // Because these metrics are for understanding the costs of handling LIST requests, + // get them recorded even in error cases. + defer func() { + numReturn := v.Len() + metrics.RecordStorageListMetrics(s.groupResourceString, numFetched, numEvald, numReturn) + }() for { startTime := time.Now() getResp, err = s.client.KV.Get(ctx, key, options...) @@ -734,6 +744,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, if err != nil { return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix) } + numFetched += len(getResp.Kvs) if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil { return err } @@ -767,6 +778,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil { return err } + numEvald++ } // indicate to the client which resource version was returned diff --git a/vendor/modules.txt b/vendor/modules.txt index 1156667d18e..18b3f39ef90 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1534,6 +1534,7 @@ k8s.io/apiserver/pkg/server/routes k8s.io/apiserver/pkg/server/storage k8s.io/apiserver/pkg/storage k8s.io/apiserver/pkg/storage/cacher +k8s.io/apiserver/pkg/storage/cacher/metrics k8s.io/apiserver/pkg/storage/errors k8s.io/apiserver/pkg/storage/etcd3 k8s.io/apiserver/pkg/storage/etcd3/metrics