From 85359d74ed0fe2cdb8e2d41cb5a3b501a2c20ed8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Tue, 28 Feb 2023 11:16:13 +0100 Subject: [PATCH] Add metrics for number of events received from etcd --- .../pkg/storage/cacher/metrics/metrics.go | 12 ++++++++++++ .../apiserver/pkg/storage/cacher/watch_cache.go | 2 ++ .../apiserver/pkg/storage/etcd3/metrics/metrics.go | 14 ++++++++++++++ .../k8s.io/apiserver/pkg/storage/etcd3/watcher.go | 1 + 4 files changed, 29 insertions(+) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go index ffebf5e5b48..c780cc57ed4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go @@ -74,6 +74,17 @@ var ( []string{"resource"}, ) + EventsReceivedCounter = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "events_received_total", + Help: "Counter of events received in watch cache broken by resource type.", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"resource"}, + ) + EventsCounter = compbasemetrics.NewCounterVec( &compbasemetrics.CounterOpts{ Namespace: namespace, @@ -147,6 +158,7 @@ func Register() { legacyregistry.MustRegister(listCacheNumFetched) legacyregistry.MustRegister(listCacheNumReturned) legacyregistry.MustRegister(InitCounter) + legacyregistry.MustRegister(EventsReceivedCounter) legacyregistry.MustRegister(EventsCounter) legacyregistry.MustRegister(TerminatedWatchersCounter) legacyregistry.MustRegister(watchCacheCapacityIncreaseTotal) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 3e67f513285..e719f6f0551 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -280,6 +280,8 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob // processEvent is safe as long as there is at most one call to it in flight // at any point in time. func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error { + metrics.EventsReceivedCounter.WithLabelValues(w.groupResource.String()).Inc() + key, err := w.keyFunc(event.Object) if err != nil { return fmt.Errorf("couldn't compute key: %v", err) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go index a01f50954fe..6f155c0adb2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go @@ -64,6 +64,15 @@ var ( }, []string{"endpoint"}, ) + etcdEventsReceivedCounts = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Subsystem: "apiserver", + Name: "storage_events_received_total", + Help: "Number of etcd events received split by kind.", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"resource"}, + ) etcdBookmarkCounts = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ Name: "etcd_bookmark_counts", @@ -153,6 +162,11 @@ func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) { etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime)) } +// RecordEtcdEvent updated the etcd_events_received_total metric. +func RecordEtcdEvent(resource string) { + etcdEventsReceivedCounts.WithLabelValues(resource).Inc() +} + // RecordEtcdBookmark updates the etcd_bookmark_counts metric. func RecordEtcdBookmark(resource string) { etcdBookmarkCounts.WithLabelValues(resource).Inc() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index c0b7be35c55..af4992c93f8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -256,6 +256,7 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) { } for _, e := range wres.Events { + metrics.RecordEtcdEvent(wc.watcher.groupResource.String()) parsedEvent, err := parseEvent(e) if err != nil { logWatchChannelErr(err)