diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 745d51dd8e4..ac5563ece45 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -70,7 +70,8 @@ type Config struct { // An underlying storage.Versioner. Versioner storage.Versioner - // The GroupResource the cacher is caching. Used for disambiguating *unstructured.Unstructured (CRDs) in logging. + // The GroupResource the cacher is caching. Used for disambiguating *unstructured.Unstructured (CRDs) in logging + // and metrics. GroupResource schema.GroupResource // The Cache will be caching objects of a given Type and assumes that they @@ -384,7 +385,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } watchCache := newWatchCache( - config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType) + config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource) listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix @@ -430,7 +431,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { successfulList = true c.ready.set(true) klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String()) - metrics.WatchCacheInitializations.WithLabelValues(c.objectType.String()).Inc() + metrics.WatchCacheInitializations.WithLabelValues(c.groupResource.String()).Inc() }) defer func() { if successfulList { @@ -804,7 +805,7 @@ func (c *Cacher) dispatchEvents() { c.dispatchEvent(&event) } lastProcessedResourceVersion = event.ResourceVersion - metrics.EventsCounter.WithLabelValues(c.objectType.String()).Inc() + metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc() case <-bookmarkTimer.C(): bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25)) // Never send a bookmark event if we did not see an event here, this is fine @@ -1281,7 +1282,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { // Since we don't want to block on it infinitely, // we simply terminate it. klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result)) - metrics.TerminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc() + metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc() c.forget(false) } @@ -1479,9 +1480,8 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch initEventCount++ } - objType := c.objectType.String() if initEventCount > 0 { - metrics.InitCounter.WithLabelValues(objType).Add(float64(initEventCount)) + metrics.InitCounter.WithLabelValues(c.groupResource.String()).Add(float64(initEventCount)) } processingTime := time.Since(startTime) if processingTime > initProcessThreshold { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 3450baebfd0..c15ff5e8645 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -48,10 +48,6 @@ import ( testingclock "k8s.io/utils/clock/testing" ) -var ( - objectType = reflect.TypeOf(&v1.Pod{}) -) - // verifies the cacheWatcher.process goroutine is properly cleaned up even if // the writes to cacheWatcher.result channel is blocked. func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 9deff40704e..2bd4284e5b8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -19,7 +19,6 @@ package cacher import ( "fmt" "math" - "reflect" "sort" "sync" "time" @@ -28,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/cacher/metrics" @@ -189,8 +189,8 @@ type watchCache struct { // An underlying storage.Versioner. versioner storage.Versioner - // cacher's objectType. - objectType reflect.Type + // cacher's group resource + groupResource schema.GroupResource // For testing cache interval invalidation. indexValidator indexValidator @@ -203,7 +203,7 @@ func newWatchCache( versioner storage.Versioner, indexers *cache.Indexers, clock clock.Clock, - objectType reflect.Type) *watchCache { + groupResource schema.GroupResource) *watchCache { wc := &watchCache{ capacity: defaultLowerBoundCapacity, keyFunc: keyFunc, @@ -219,10 +219,9 @@ func newWatchCache( eventHandler: eventHandler, clock: clock, versioner: versioner, - objectType: objectType, + groupResource: groupResource, } - objType := objectType.String() - metrics.WatchCacheCapacity.WithLabelValues(objType).Set(float64(wc.capacity)) + metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity)) wc.cond = sync.NewCond(wc.RLocker()) wc.indexValidator = wc.isIndexValidLocked @@ -387,7 +386,7 @@ func (w *watchCache) doCacheResizeLocked(capacity int) { newCache[i%capacity] = w.cache[i%w.capacity] } w.cache = newCache - metrics.RecordsWatchCacheCapacityChange(w.objectType.String(), w.capacity, capacity) + metrics.RecordsWatchCacheCapacityChange(w.groupResource.String(), w.capacity, capacity) w.capacity = capacity } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index df6f9c0088b..6200fc6e566 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -18,7 +18,6 @@ package cacher import ( "fmt" - "reflect" "strconv" "strings" "testing" @@ -31,9 +30,9 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - "k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/storage" "k8s.io/client-go/tools/cache" testingclock "k8s.io/utils/clock/testing" @@ -112,7 +111,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { } versioner := storage.APIObjectVersioner{} mockHandler := func(*watchCacheEvent) {} - wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), reflect.TypeOf(&example.Pod{})) + wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}) // To preserve behavior of tests that assume a given capacity, // resize it to th expected size. wc.capacity = capacity