diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go index d0b3cdc9048..6de0ad1eecf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go @@ -146,6 +146,16 @@ var ( }, []string{"resource"}, ) + + WatchCacheReadWait = compbasemetrics.NewHistogramVec( + &compbasemetrics.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "read_wait_seconds", + Help: "Histogram of time spent waiting for a watch cache to become fresh.", + StabilityLevel: compbasemetrics.ALPHA, + Buckets: []float64{0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3}, + }, []string{"resource"}) ) var registerMetrics sync.Once @@ -165,6 +175,7 @@ func Register() { legacyregistry.MustRegister(watchCacheCapacityDecreaseTotal) legacyregistry.MustRegister(WatchCacheCapacity) legacyregistry.MustRegister(WatchCacheInitializations) + legacyregistry.MustRegister(WatchCacheReadWait) }) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index cc797621b7a..bb875fe692d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -440,6 +440,11 @@ func (w *watchCache) List() []interface{} { // You HAVE TO explicitly call w.RUnlock() after this function. func (w *watchCache) waitUntilFreshAndBlock(ctx context.Context, resourceVersion uint64) error { startTime := w.clock.Now() + defer func() { + if resourceVersion > 0 { + metrics.WatchCacheReadWait.WithContext(ctx).WithLabelValues(w.groupResource.String()).Observe(w.clock.Since(startTime).Seconds()) + } + }() // In case resourceVersion is 0, we accept arbitrarily stale result. // As a result, the condition in the below for loop will never be diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 8e37e0cf83e..752c607a083 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -36,9 +36,12 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/cacher/metrics" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" featuregatetesting "k8s.io/component-base/featuregate/testing" + k8smetrics "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/testutil" "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" ) @@ -1123,3 +1126,72 @@ func BenchmarkWatchCache_updateCache(b *testing.B) { store.updateCache(add) } } + +func TestHistogramCacheReadWait(t *testing.T) { + registry := k8smetrics.NewKubeRegistry() + if err := registry.Register(metrics.WatchCacheReadWait); err != nil { + t.Errorf("unexpected error: %v", err) + } + ctx := context.Background() + testedMetrics := "apiserver_watch_cache_read_wait_seconds" + store := newTestWatchCache(2, &cache.Indexers{}) + defer store.Stop() + + // In background, update the store. + go func() { + if err := store.Add(makeTestPod("foo", 2)); err != nil { + t.Errorf("unexpected error: %v", err) + } + if err := store.Add(makeTestPod("bar", 5)); err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + + testCases := []struct { + desc string + resourceVersion uint64 + want string + }{ + { + desc: "resourceVersion is non-zero", + resourceVersion: 5, + want: ` + # HELP apiserver_watch_cache_read_wait_seconds [ALPHA] Histogram of time spent waiting for a watch cache to become fresh. + # TYPE apiserver_watch_cache_read_wait_seconds histogram + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="0.005"} 1 + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="0.025"} 1 + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="0.05"} 1 + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="0.1"} 1 + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="0.2"} 1 + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="0.4"} 1 + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="0.6"} 1 + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="0.8"} 1 + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="1"} 1 + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="1.25"} 1 + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="1.5"} 1 + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="2"} 1 + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="3"} 1 + apiserver_watch_cache_read_wait_seconds_bucket{resource="pods",le="+Inf"} 1 + apiserver_watch_cache_read_wait_seconds_sum{resource="pods"} 0 + apiserver_watch_cache_read_wait_seconds_count{resource="pods"} 1 +`, + }, + { + desc: "resourceVersion is 0", + resourceVersion: 0, + want: ``, + }, + } + + for _, test := range testCases { + t.Run(test.desc, func(t *testing.T) { + defer registry.Reset() + if _, _, _, err := store.WaitUntilFreshAndGet(ctx, test.resourceVersion, "prefix/ns/bar"); err != nil { + t.Errorf("unexpected error: %v", err) + } + if err := testutil.GatherAndCompare(registry, strings.NewReader(test.want), testedMetrics); err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + } +}