From 397533a4c2df9639ff4422c907d06fae195a1835 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Thu, 11 Aug 2022 15:45:04 -0400 Subject: [PATCH 1/2] watch cache: log GroupResource, not objectType All CustomResources are treated as *unstructured.Unstructured, leading the watch cache to log anything related to CRs as Unstructured. This change uses the schema.GroupResource instead of object type for all type related log messages in the watch cache, resulting in distinct output for each CR type. Signed-off-by: Andy Goldstein --- .../generic/registry/storage_factory.go | 1 + .../registry/generic/registry/store_test.go | 1 + .../apiserver/pkg/storage/cacher/cacher.go | 52 ++++++++++++++----- .../storage/cacher/cacher_whitebox_test.go | 16 +++--- .../pkg/storage/tests/cacher_test.go | 1 + 5 files changed, 50 insertions(+), 21 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go index da9a870b3a1..3983c92d013 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go @@ -56,6 +56,7 @@ func StorageWithCacher() generic.StorageDecorator { cacherConfig := cacherstorage.Config{ Storage: s, Versioner: storage.APIObjectVersioner{}, + GroupResource: storageConfig.GroupResource, ResourcePrefix: resourcePrefix, KeyFunc: keyFunc, NewFunc: newFunc, diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index de1fdb6373e..1c0b2cc6a76 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -2318,6 +2318,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE config := cacherstorage.Config{ Storage: s, Versioner: storage.APIObjectVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, ResourcePrefix: podPrefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, GetAttrsFunc: getPodAttrs, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 24dd64d9e66..745d51dd8e4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -69,6 +70,9 @@ type Config struct { // An underlying storage.Versioner. Versioner storage.Versioner + // The GroupResource the cacher is caching. Used for disambiguating *unstructured.Unstructured (CRDs) in logging. + GroupResource schema.GroupResource + // The Cache will be caching objects of a given Type and assumes that they // are all stored under ResourcePrefix directory in the underlying database. ResourcePrefix string @@ -146,13 +150,13 @@ func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool } } -func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cacheWatcher)) { +func (i *indexedWatchers) terminateAll(groupResource schema.GroupResource, done func(*cacheWatcher)) { // note that we don't have to call setDrainInputBufferLocked method on the watchers // because we take advantage of the default value - stop immediately // also watchers that have had already its draining strategy set // are no longer available (they were removed from the allWatchers and the valueWatchers maps) if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 { - klog.Warningf("Terminating all watchers from cacher %v", objectType) + klog.Warningf("Terminating all watchers from cacher %v", groupResource) } i.allWatchers.terminateAll(done) for _, watchers := range i.valueWatchers { @@ -258,6 +262,8 @@ type Cacher struct { // Expected type of objects in the underlying cache. objectType reflect.Type + // Used for logging, to disambiguate *unstructured.Unstructured (CRDs) + groupResource schema.GroupResource // "sliding window" of recent changes of objects and the current state. watchCache *watchCache @@ -347,6 +353,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { ready: newReady(), storage: config.Storage, objectType: objType, + groupResource: config.GroupResource, versioner: config.Versioner, newFunc: config.NewFunc, indexedTrigger: indexedTrigger, @@ -422,7 +429,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { c.watchCache.SetOnReplace(func() { successfulList = true c.ready.set(true) - klog.V(1).Infof("cacher (%v): initialized", c.objectType.String()) + klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String()) metrics.WatchCacheInitializations.WithLabelValues(c.objectType.String()).Inc() }) defer func() { @@ -437,7 +444,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { // Also note that startCaching is called in a loop, so there's no need // to have another loop here. if err := c.reflector.ListAndWatch(stopChannel); err != nil { - klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.objectType.String(), err) + klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err) } } @@ -508,7 +515,16 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // given that memory allocation may trigger GC and block the thread. // Also note that emptyFunc is a placeholder, until we will be able // to compute watcher.forget function (which has to happen under lock). - watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType, identifier) + watcher := newCacheWatcher( + chanSize, + filterWithAttrsFunction(key, pred), + emptyFunc, + c.versioner, + deadline, + pred.AllowWatchBookmarks, + c.groupResource, + identifier, + ) // We explicitly use thread unsafe version and do locking ourself to ensure that // no new events will be processed in the meantime. The watchCache will be unlocked @@ -654,7 +670,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio trace := utiltrace.New("cacher list", utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)}, - utiltrace.Field{Key: "type", Value: c.objectType.String()}) + utiltrace.Field{Key: "type", Value: c.groupResource.String()}) defer trace.LogIfLong(500 * time.Millisecond) if err := c.ready.wait(); err != nil { @@ -757,7 +773,7 @@ func (c *Cacher) triggerValuesThreadUnsafe(event *watchCacheEvent) ([]string, bo func (c *Cacher) processEvent(event *watchCacheEvent) { if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) { // Monitor if this gets backed up, and how much. - klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen) + klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.groupResource.String(), curLen) } c.incoming <- *event } @@ -1008,7 +1024,7 @@ func (c *Cacher) finishDispatching() { func (c *Cacher) terminateAllWatchers() { c.Lock() defer c.Unlock() - c.watchers.terminateAll(c.objectType, c.stopWatcherLocked) + c.watchers.terminateAll(c.groupResource, c.stopWatcherLocked) } func (c *Cacher) stopWatcherLocked(watcher *cacheWatcher) { @@ -1176,8 +1192,7 @@ type cacheWatcher struct { // save it here to send bookmark events before that. deadline time.Time allowWatchBookmarks bool - // Object type of the cache watcher interests - objectType reflect.Type + groupResource schema.GroupResource // human readable identifier that helps assigning cacheWatcher // instance with request @@ -1188,7 +1203,16 @@ type cacheWatcher struct { drainInputBuffer bool } -func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(bool), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type, identifier string) *cacheWatcher { +func newCacheWatcher( + chanSize int, + filter filterWithAttrsFunc, + forget func(bool), + versioner storage.Versioner, + deadline time.Time, + allowWatchBookmarks bool, + groupResource schema.GroupResource, + identifier string, +) *cacheWatcher { return &cacheWatcher{ input: make(chan *watchCacheEvent, chanSize), result: make(chan watch.Event, chanSize), @@ -1199,7 +1223,7 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(bool) versioner: versioner, deadline: deadline, allowWatchBookmarks: allowWatchBookmarks, - objectType: objectType, + groupResource: groupResource, identifier: identifier, } } @@ -1256,7 +1280,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { // This means that we couldn't send event to that watcher. // Since we don't want to block on it infinitely, // we simply terminate it. - klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.objectType.String(), c.identifier, len(c.input), len(c.result)) + klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result)) metrics.TerminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc() c.forget(false) } @@ -1461,7 +1485,7 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch } processingTime := time.Since(startTime) if processingTime > initProcessThreshold { - klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, objType, c.identifier, processingTime) + klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime) } c.process(ctx, resourceVersion) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 883571e37d6..3450baebfd0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/diff" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -74,7 +75,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. - w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { @@ -194,7 +195,7 @@ TestCase: testCase.events[j].ResourceVersion = uint64(j) + 1 } - w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") + w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0) ch := w.ResultChan() @@ -266,6 +267,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) { config := Config{ Storage: s, Versioner: testVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, GetAttrsFunc: storage.DefaultNamespaceScopedAttr, @@ -537,7 +539,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { // timeout to zero and run the Stop goroutine concurrently. // May sure that the watch will not be blocked on Stop. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") go w.Stop() select { case <-done: @@ -549,7 +551,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { deadline := time.Now().Add(time.Hour) // After that, verifies the cacheWatcher.process goroutine works correctly. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "") + w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "") w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} ctx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() @@ -674,7 +676,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) { forget := func(bool) {} newWatcher := func(deadline time.Time) *cacheWatcher { - return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, objectType, "") + return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") } clock := testingclock.NewFakeClock(time.Now()) @@ -1616,7 +1618,7 @@ func TestCacheWatcherDraining(t *testing.T) { makeWatchCacheEvent(5), makeWatchCacheEvent(6), } - w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "") + w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { t.Fatal("failed adding an even to the watcher") @@ -1657,7 +1659,7 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) { makeWatchCacheEvent(5), makeWatchCacheEvent(6), } - w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "") + w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { t.Fatal("failed adding an even to the watcher") diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 0b036c36809..0966d234a99 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -121,6 +121,7 @@ func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstor config := cacherstorage.Config{ Storage: s, Versioner: v, + GroupResource: schema.GroupResource{Resource: "pods"}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, GetAttrsFunc: GetAttrs, From d08b69e8d35a5aa73a178c508f9b0e1ad74b882d Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Thu, 11 Aug 2022 15:51:21 -0400 Subject: [PATCH 2/2] watch cache: metrics: objectType -> group resource Use the group resource instead of objectType in watch cache metrics, because all CustomResources are grouped together as *unstructured.Unstructured, instead of 1 entry per type. Signed-off-by: Andy Goldstein --- .../k8s.io/apiserver/pkg/storage/cacher/cacher.go | 14 +++++++------- .../pkg/storage/cacher/cacher_whitebox_test.go | 4 ---- .../apiserver/pkg/storage/cacher/watch_cache.go | 15 +++++++-------- .../pkg/storage/cacher/watch_cache_test.go | 5 ++--- 4 files changed, 16 insertions(+), 22 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 745d51dd8e4..ac5563ece45 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -70,7 +70,8 @@ type Config struct { // An underlying storage.Versioner. Versioner storage.Versioner - // The GroupResource the cacher is caching. Used for disambiguating *unstructured.Unstructured (CRDs) in logging. + // The GroupResource the cacher is caching. Used for disambiguating *unstructured.Unstructured (CRDs) in logging + // and metrics. GroupResource schema.GroupResource // The Cache will be caching objects of a given Type and assumes that they @@ -384,7 +385,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } watchCache := newWatchCache( - config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType) + config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource) listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix @@ -430,7 +431,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { successfulList = true c.ready.set(true) klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String()) - metrics.WatchCacheInitializations.WithLabelValues(c.objectType.String()).Inc() + metrics.WatchCacheInitializations.WithLabelValues(c.groupResource.String()).Inc() }) defer func() { if successfulList { @@ -804,7 +805,7 @@ func (c *Cacher) dispatchEvents() { c.dispatchEvent(&event) } lastProcessedResourceVersion = event.ResourceVersion - metrics.EventsCounter.WithLabelValues(c.objectType.String()).Inc() + metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc() case <-bookmarkTimer.C(): bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25)) // Never send a bookmark event if we did not see an event here, this is fine @@ -1281,7 +1282,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { // Since we don't want to block on it infinitely, // we simply terminate it. klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result)) - metrics.TerminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc() + metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc() c.forget(false) } @@ -1479,9 +1480,8 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch initEventCount++ } - objType := c.objectType.String() if initEventCount > 0 { - metrics.InitCounter.WithLabelValues(objType).Add(float64(initEventCount)) + metrics.InitCounter.WithLabelValues(c.groupResource.String()).Add(float64(initEventCount)) } processingTime := time.Since(startTime) if processingTime > initProcessThreshold { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 3450baebfd0..c15ff5e8645 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -48,10 +48,6 @@ import ( testingclock "k8s.io/utils/clock/testing" ) -var ( - objectType = reflect.TypeOf(&v1.Pod{}) -) - // verifies the cacheWatcher.process goroutine is properly cleaned up even if // the writes to cacheWatcher.result channel is blocked. func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 9deff40704e..2bd4284e5b8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -19,7 +19,6 @@ package cacher import ( "fmt" "math" - "reflect" "sort" "sync" "time" @@ -28,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/cacher/metrics" @@ -189,8 +189,8 @@ type watchCache struct { // An underlying storage.Versioner. versioner storage.Versioner - // cacher's objectType. - objectType reflect.Type + // cacher's group resource + groupResource schema.GroupResource // For testing cache interval invalidation. indexValidator indexValidator @@ -203,7 +203,7 @@ func newWatchCache( versioner storage.Versioner, indexers *cache.Indexers, clock clock.Clock, - objectType reflect.Type) *watchCache { + groupResource schema.GroupResource) *watchCache { wc := &watchCache{ capacity: defaultLowerBoundCapacity, keyFunc: keyFunc, @@ -219,10 +219,9 @@ func newWatchCache( eventHandler: eventHandler, clock: clock, versioner: versioner, - objectType: objectType, + groupResource: groupResource, } - objType := objectType.String() - metrics.WatchCacheCapacity.WithLabelValues(objType).Set(float64(wc.capacity)) + metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity)) wc.cond = sync.NewCond(wc.RLocker()) wc.indexValidator = wc.isIndexValidLocked @@ -387,7 +386,7 @@ func (w *watchCache) doCacheResizeLocked(capacity int) { newCache[i%capacity] = w.cache[i%w.capacity] } w.cache = newCache - metrics.RecordsWatchCacheCapacityChange(w.objectType.String(), w.capacity, capacity) + metrics.RecordsWatchCacheCapacityChange(w.groupResource.String(), w.capacity, capacity) w.capacity = capacity } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index df6f9c0088b..6200fc6e566 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -18,7 +18,6 @@ package cacher import ( "fmt" - "reflect" "strconv" "strings" "testing" @@ -31,9 +30,9 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - "k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/storage" "k8s.io/client-go/tools/cache" testingclock "k8s.io/utils/clock/testing" @@ -112,7 +111,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { } versioner := storage.APIObjectVersioner{} mockHandler := func(*watchCacheEvent) {} - wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), reflect.TypeOf(&example.Pod{})) + wc := newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}) // To preserve behavior of tests that assume a given capacity, // resize it to th expected size. wc.capacity = capacity