watch cache: log GroupResource, not objectType

All CustomResources are treated as *unstructured.Unstructured, leading
the watch cache to log anything related to CRs as Unstructured. This
change uses the schema.GroupResource instead of object type for all type
related log messages in the watch cache, resulting in distinct output
for each CR type.

Signed-off-by: Andy Goldstein <andy.goldstein@redhat.com>
This commit is contained in:
Andy Goldstein 2022-08-11 15:45:04 -04:00
parent 619f1cf552
commit 397533a4c2
5 changed files with 50 additions and 21 deletions

View File

@ -56,6 +56,7 @@ func StorageWithCacher() generic.StorageDecorator {
cacherConfig := cacherstorage.Config{ cacherConfig := cacherstorage.Config{
Storage: s, Storage: s,
Versioner: storage.APIObjectVersioner{}, Versioner: storage.APIObjectVersioner{},
GroupResource: storageConfig.GroupResource,
ResourcePrefix: resourcePrefix, ResourcePrefix: resourcePrefix,
KeyFunc: keyFunc, KeyFunc: keyFunc,
NewFunc: newFunc, NewFunc: newFunc,

View File

@ -2318,6 +2318,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
config := cacherstorage.Config{ config := cacherstorage.Config{
Storage: s, Storage: s,
Versioner: storage.APIObjectVersioner{}, Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
ResourcePrefix: podPrefix, ResourcePrefix: podPrefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
GetAttrsFunc: getPodAttrs, GetAttrsFunc: getPodAttrs,

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
@ -69,6 +70,9 @@ type Config struct {
// An underlying storage.Versioner. // An underlying storage.Versioner.
Versioner storage.Versioner Versioner storage.Versioner
// The GroupResource the cacher is caching. Used for disambiguating *unstructured.Unstructured (CRDs) in logging.
GroupResource schema.GroupResource
// The Cache will be caching objects of a given Type and assumes that they // The Cache will be caching objects of a given Type and assumes that they
// are all stored under ResourcePrefix directory in the underlying database. // are all stored under ResourcePrefix directory in the underlying database.
ResourcePrefix string ResourcePrefix string
@ -146,13 +150,13 @@ func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool
} }
} }
func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cacheWatcher)) { func (i *indexedWatchers) terminateAll(groupResource schema.GroupResource, done func(*cacheWatcher)) {
// note that we don't have to call setDrainInputBufferLocked method on the watchers // note that we don't have to call setDrainInputBufferLocked method on the watchers
// because we take advantage of the default value - stop immediately // because we take advantage of the default value - stop immediately
// also watchers that have had already its draining strategy set // also watchers that have had already its draining strategy set
// are no longer available (they were removed from the allWatchers and the valueWatchers maps) // are no longer available (they were removed from the allWatchers and the valueWatchers maps)
if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 { if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
klog.Warningf("Terminating all watchers from cacher %v", objectType) klog.Warningf("Terminating all watchers from cacher %v", groupResource)
} }
i.allWatchers.terminateAll(done) i.allWatchers.terminateAll(done)
for _, watchers := range i.valueWatchers { for _, watchers := range i.valueWatchers {
@ -258,6 +262,8 @@ type Cacher struct {
// Expected type of objects in the underlying cache. // Expected type of objects in the underlying cache.
objectType reflect.Type objectType reflect.Type
// Used for logging, to disambiguate *unstructured.Unstructured (CRDs)
groupResource schema.GroupResource
// "sliding window" of recent changes of objects and the current state. // "sliding window" of recent changes of objects and the current state.
watchCache *watchCache watchCache *watchCache
@ -347,6 +353,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
ready: newReady(), ready: newReady(),
storage: config.Storage, storage: config.Storage,
objectType: objType, objectType: objType,
groupResource: config.GroupResource,
versioner: config.Versioner, versioner: config.Versioner,
newFunc: config.NewFunc, newFunc: config.NewFunc,
indexedTrigger: indexedTrigger, indexedTrigger: indexedTrigger,
@ -422,7 +429,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
c.watchCache.SetOnReplace(func() { c.watchCache.SetOnReplace(func() {
successfulList = true successfulList = true
c.ready.set(true) c.ready.set(true)
klog.V(1).Infof("cacher (%v): initialized", c.objectType.String()) klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String())
metrics.WatchCacheInitializations.WithLabelValues(c.objectType.String()).Inc() metrics.WatchCacheInitializations.WithLabelValues(c.objectType.String()).Inc()
}) })
defer func() { defer func() {
@ -437,7 +444,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
// Also note that startCaching is called in a loop, so there's no need // Also note that startCaching is called in a loop, so there's no need
// to have another loop here. // to have another loop here.
if err := c.reflector.ListAndWatch(stopChannel); err != nil { if err := c.reflector.ListAndWatch(stopChannel); err != nil {
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.objectType.String(), err) klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err)
} }
} }
@ -508,7 +515,16 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// given that memory allocation may trigger GC and block the thread. // given that memory allocation may trigger GC and block the thread.
// Also note that emptyFunc is a placeholder, until we will be able // Also note that emptyFunc is a placeholder, until we will be able
// to compute watcher.forget function (which has to happen under lock). // to compute watcher.forget function (which has to happen under lock).
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType, identifier) watcher := newCacheWatcher(
chanSize,
filterWithAttrsFunction(key, pred),
emptyFunc,
c.versioner,
deadline,
pred.AllowWatchBookmarks,
c.groupResource,
identifier,
)
// We explicitly use thread unsafe version and do locking ourself to ensure that // We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked // no new events will be processed in the meantime. The watchCache will be unlocked
@ -654,7 +670,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
trace := utiltrace.New("cacher list", trace := utiltrace.New("cacher list",
utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)}, utiltrace.Field{"audit-id", endpointsrequest.GetAuditIDTruncated(ctx)},
utiltrace.Field{Key: "type", Value: c.objectType.String()}) utiltrace.Field{Key: "type", Value: c.groupResource.String()})
defer trace.LogIfLong(500 * time.Millisecond) defer trace.LogIfLong(500 * time.Millisecond)
if err := c.ready.wait(); err != nil { if err := c.ready.wait(); err != nil {
@ -757,7 +773,7 @@ func (c *Cacher) triggerValuesThreadUnsafe(event *watchCacheEvent) ([]string, bo
func (c *Cacher) processEvent(event *watchCacheEvent) { func (c *Cacher) processEvent(event *watchCacheEvent) {
if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) { if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much. // Monitor if this gets backed up, and how much.
klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen) klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.groupResource.String(), curLen)
} }
c.incoming <- *event c.incoming <- *event
} }
@ -1008,7 +1024,7 @@ func (c *Cacher) finishDispatching() {
func (c *Cacher) terminateAllWatchers() { func (c *Cacher) terminateAllWatchers() {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
c.watchers.terminateAll(c.objectType, c.stopWatcherLocked) c.watchers.terminateAll(c.groupResource, c.stopWatcherLocked)
} }
func (c *Cacher) stopWatcherLocked(watcher *cacheWatcher) { func (c *Cacher) stopWatcherLocked(watcher *cacheWatcher) {
@ -1176,8 +1192,7 @@ type cacheWatcher struct {
// save it here to send bookmark events before that. // save it here to send bookmark events before that.
deadline time.Time deadline time.Time
allowWatchBookmarks bool allowWatchBookmarks bool
// Object type of the cache watcher interests groupResource schema.GroupResource
objectType reflect.Type
// human readable identifier that helps assigning cacheWatcher // human readable identifier that helps assigning cacheWatcher
// instance with request // instance with request
@ -1188,7 +1203,16 @@ type cacheWatcher struct {
drainInputBuffer bool drainInputBuffer bool
} }
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(bool), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type, identifier string) *cacheWatcher { func newCacheWatcher(
chanSize int,
filter filterWithAttrsFunc,
forget func(bool),
versioner storage.Versioner,
deadline time.Time,
allowWatchBookmarks bool,
groupResource schema.GroupResource,
identifier string,
) *cacheWatcher {
return &cacheWatcher{ return &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize), input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize), result: make(chan watch.Event, chanSize),
@ -1199,7 +1223,7 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(bool)
versioner: versioner, versioner: versioner,
deadline: deadline, deadline: deadline,
allowWatchBookmarks: allowWatchBookmarks, allowWatchBookmarks: allowWatchBookmarks,
objectType: objectType, groupResource: groupResource,
identifier: identifier, identifier: identifier,
} }
} }
@ -1256,7 +1280,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
// This means that we couldn't send event to that watcher. // This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely, // Since we don't want to block on it infinitely,
// we simply terminate it. // we simply terminate it.
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.objectType.String(), c.identifier, len(c.input), len(c.result)) klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result))
metrics.TerminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc() metrics.TerminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc()
c.forget(false) c.forget(false)
} }
@ -1461,7 +1485,7 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch
} }
processingTime := time.Since(startTime) processingTime := time.Since(startTime)
if processingTime > initProcessThreshold { if processingTime > initProcessThreshold {
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, objType, c.identifier, processingTime) klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
} }
c.process(ctx, resourceVersion) c.process(ctx, resourceVersion)

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/diff"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -74,7 +75,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
} }
// set the size of the buffer of w.result to 0, so that the writes to // set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked. // w.result is blocked.
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
w.Stop() w.Stop()
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
@ -194,7 +195,7 @@ TestCase:
testCase.events[j].ResourceVersion = uint64(j) + 1 testCase.events[j].ResourceVersion = uint64(j) + 1
} }
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0) go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0)
ch := w.ResultChan() ch := w.ResultChan()
@ -266,6 +267,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
config := Config{ config := Config{
Storage: s, Storage: s,
Versioner: testVersioner{}, Versioner: testVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
ResourcePrefix: prefix, ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: storage.DefaultNamespaceScopedAttr, GetAttrsFunc: storage.DefaultNamespaceScopedAttr,
@ -537,7 +539,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
// timeout to zero and run the Stop goroutine concurrently. // timeout to zero and run the Stop goroutine concurrently.
// May sure that the watch will not be blocked on Stop. // May sure that the watch will not be blocked on Stop.
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
go w.Stop() go w.Stop()
select { select {
case <-done: case <-done:
@ -549,7 +551,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
deadline := time.Now().Add(time.Hour) deadline := time.Now().Add(time.Hour)
// After that, verifies the cacheWatcher.process goroutine works correctly. // After that, verifies the cacheWatcher.process goroutine works correctly.
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "") w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "")
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
ctx, cancel := context.WithDeadline(context.Background(), deadline) ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel() defer cancel()
@ -674,7 +676,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
forget := func(bool) {} forget := func(bool) {}
newWatcher := func(deadline time.Time) *cacheWatcher { newWatcher := func(deadline time.Time) *cacheWatcher {
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, objectType, "") return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
} }
clock := testingclock.NewFakeClock(time.Now()) clock := testingclock.NewFakeClock(time.Now())
@ -1616,7 +1618,7 @@ func TestCacheWatcherDraining(t *testing.T) {
makeWatchCacheEvent(5), makeWatchCacheEvent(5),
makeWatchCacheEvent(6), makeWatchCacheEvent(6),
} }
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "") w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
t.Fatal("failed adding an even to the watcher") t.Fatal("failed adding an even to the watcher")
@ -1657,7 +1659,7 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) {
makeWatchCacheEvent(5), makeWatchCacheEvent(5),
makeWatchCacheEvent(6), makeWatchCacheEvent(6),
} }
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "") w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
t.Fatal("failed adding an even to the watcher") t.Fatal("failed adding an even to the watcher")

View File

@ -121,6 +121,7 @@ func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstor
config := cacherstorage.Config{ config := cacherstorage.Config{
Storage: s, Storage: s,
Versioner: v, Versioner: v,
GroupResource: schema.GroupResource{Resource: "pods"},
ResourcePrefix: prefix, ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: GetAttrs, GetAttrsFunc: GetAttrs,