diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go index da9a870b3a1..3983c92d013 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go @@ -56,6 +56,7 @@ func StorageWithCacher() generic.StorageDecorator { cacherConfig := cacherstorage.Config{ Storage: s, Versioner: storage.APIObjectVersioner{}, + GroupResource: storageConfig.GroupResource, ResourcePrefix: resourcePrefix, KeyFunc: keyFunc, NewFunc: newFunc, diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index de1fdb6373e..1c0b2cc6a76 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -2318,6 +2318,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE config := cacherstorage.Config{ Storage: s, Versioner: storage.APIObjectVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, ResourcePrefix: podPrefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, GetAttrsFunc: getPodAttrs, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 24dd64d9e66..745d51dd8e4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -69,6 +70,9 @@ type Config struct { // An underlying storage.Versioner. Versioner storage.Versioner + // The GroupResource the cacher is caching. Used for disambiguating *unstructured.Unstructured (CRDs) in logging. + GroupResource schema.GroupResource + // The Cache will be caching objects of a given Type and assumes that they // are all stored under ResourcePrefix directory in the underlying database. ResourcePrefix string @@ -146,13 +150,13 @@ func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool } } -func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cacheWatcher)) { +func (i *indexedWatchers) terminateAll(groupResource schema.GroupResource, done func(*cacheWatcher)) { // note that we don't have to call setDrainInputBufferLocked method on the watchers // because we take advantage of the default value - stop immediately // also watchers that have had already its draining strategy set // are no longer available (they were removed from the allWatchers and the valueWatchers maps) if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 { - klog.Warningf("Terminating all watchers from cacher %v", objectType) + klog.Warningf("Terminating all watchers from cacher %v", groupResource) } i.allWatchers.terminateAll(done) for _, watchers := range i.valueWatchers { @@ -258,6 +262,8 @@ type Cacher struct { // Expected type of objects in the underlying cache. objectType reflect.Type + // Used for logging, to disambiguate *unstructured.Unstructured (CRDs) + groupResource schema.GroupResource // "sliding window" of recent changes of objects and the current state. watchCache *watchCache @@ -347,6 +353,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { ready: newReady(), storage: config.Storage, objectType: objType, + groupResource: config.GroupResource, versioner: config.Versioner, newFunc: config.NewFunc, indexedTrigger: indexedTrigger, @@ -422,7 +429,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { c.watchCache.SetOnReplace(func() { successfulList = true c.ready.set(true) - klog.V(1).Infof("cacher (%v): initialized", c.objectType.String()) + klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String()) metrics.WatchCacheInitializations.WithLabelValues(c.objectType.String()).Inc() }) defer func() { @@ -437,7 +444,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { // Also note that startCaching is called in a loop, so there's no need // to have another loop here. if err := c.reflector.ListAndWatch(stopChannel); err != nil { - klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.objectType.String(), err) + klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err) } } @@ -508,7 +515,16 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // given that memory allocation may trigger GC and block the thread. // Also note that emptyFunc is a placeholder, until we will be able // to compute watcher.forget function (which has to happen under lock). - watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType, identifier) + watcher := newCacheWatcher( + chanSize, + filterWithAttrsFunction(key, pred), + emptyFunc, + c.versioner, + deadline, + pred.AllowWatchBookmarks, + c.groupResource, + identifier, + ) // We explicitly use thread unsafe version and do locking ourself to ensure that // no new events will be processed in the meantime. The watchCache will be unlocked @@ -654,7 +670,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio trace := utiltrace.New("cacher list", utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)}, - utiltrace.Field{Key: "type", Value: c.objectType.String()}) + utiltrace.Field{Key: "type", Value: c.groupResource.String()}) defer trace.LogIfLong(500 * time.Millisecond) if err := c.ready.wait(); err != nil { @@ -757,7 +773,7 @@ func (c *Cacher) triggerValuesThreadUnsafe(event *watchCacheEvent) ([]string, bo func (c *Cacher) processEvent(event *watchCacheEvent) { if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) { // Monitor if this gets backed up, and how much. - klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen) + klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.groupResource.String(), curLen) } c.incoming <- *event } @@ -1008,7 +1024,7 @@ func (c *Cacher) finishDispatching() { func (c *Cacher) terminateAllWatchers() { c.Lock() defer c.Unlock() - c.watchers.terminateAll(c.objectType, c.stopWatcherLocked) + c.watchers.terminateAll(c.groupResource, c.stopWatcherLocked) } func (c *Cacher) stopWatcherLocked(watcher *cacheWatcher) { @@ -1176,8 +1192,7 @@ type cacheWatcher struct { // save it here to send bookmark events before that. deadline time.Time allowWatchBookmarks bool - // Object type of the cache watcher interests - objectType reflect.Type + groupResource schema.GroupResource // human readable identifier that helps assigning cacheWatcher // instance with request @@ -1188,7 +1203,16 @@ type cacheWatcher struct { drainInputBuffer bool } -func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(bool), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type, identifier string) *cacheWatcher { +func newCacheWatcher( + chanSize int, + filter filterWithAttrsFunc, + forget func(bool), + versioner storage.Versioner, + deadline time.Time, + allowWatchBookmarks bool, + groupResource schema.GroupResource, + identifier string, +) *cacheWatcher { return &cacheWatcher{ input: make(chan *watchCacheEvent, chanSize), result: make(chan watch.Event, chanSize), @@ -1199,7 +1223,7 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(bool) versioner: versioner, deadline: deadline, allowWatchBookmarks: allowWatchBookmarks, - objectType: objectType, + groupResource: groupResource, identifier: identifier, } } @@ -1256,7 +1280,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { // This means that we couldn't send event to that watcher. // Since we don't want to block on it infinitely, // we simply terminate it. - klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.objectType.String(), c.identifier, len(c.input), len(c.result)) + klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result)) metrics.TerminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc() c.forget(false) } @@ -1461,7 +1485,7 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch } processingTime := time.Since(startTime) if processingTime > initProcessThreshold { - klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, objType, c.identifier, processingTime) + klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime) } c.process(ctx, resourceVersion) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 883571e37d6..3450baebfd0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/diff" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -74,7 +75,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. - w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { @@ -194,7 +195,7 @@ TestCase: testCase.events[j].ResourceVersion = uint64(j) + 1 } - w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") + w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0) ch := w.ResultChan() @@ -266,6 +267,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) { config := Config{ Storage: s, Versioner: testVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, GetAttrsFunc: storage.DefaultNamespaceScopedAttr, @@ -537,7 +539,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { // timeout to zero and run the Stop goroutine concurrently. // May sure that the watch will not be blocked on Stop. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") go w.Stop() select { case <-done: @@ -549,7 +551,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { deadline := time.Now().Add(time.Hour) // After that, verifies the cacheWatcher.process goroutine works correctly. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "") + w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "") w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} ctx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() @@ -674,7 +676,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) { forget := func(bool) {} newWatcher := func(deadline time.Time) *cacheWatcher { - return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, objectType, "") + return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") } clock := testingclock.NewFakeClock(time.Now()) @@ -1616,7 +1618,7 @@ func TestCacheWatcherDraining(t *testing.T) { makeWatchCacheEvent(5), makeWatchCacheEvent(6), } - w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "") + w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { t.Fatal("failed adding an even to the watcher") @@ -1657,7 +1659,7 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) { makeWatchCacheEvent(5), makeWatchCacheEvent(6), } - w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "") + w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { t.Fatal("failed adding an even to the watcher") diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 0b036c36809..0966d234a99 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -121,6 +121,7 @@ func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstor config := cacherstorage.Config{ Storage: s, Versioner: v, + GroupResource: schema.GroupResource{Resource: "pods"}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, GetAttrsFunc: GetAttrs,