From b78f0b31063b7d47781a1ce9ee4ed6c118fb949f Mon Sep 17 00:00:00 2001 From: Maciej Borsz Date: Mon, 11 Jan 2021 12:33:52 +0100 Subject: [PATCH] Improve logging in cacheWatcher --- .../apiserver/pkg/storage/cacher/cacher.go | 16 ++++++++++++---- .../pkg/storage/cacher/cacher_whitebox_test.go | 10 +++++----- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 03883fe3b78..de245c3a476 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -487,11 +487,14 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // Determine watch timeout('0' means deadline is not set, ignore checking) deadline, _ := ctx.Deadline() + + identifier := fmt.Sprintf("key: %q, labels: %q, fields: %q", key, pred.Label, pred.Field) + // Create a watcher here to reduce memory allocations under lock, // given that memory allocation may trigger GC and block the thread. // Also note that emptyFunc is a placeholder, until we will be able // to compute watcher.forget function (which has to happen under lock). - watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType) + watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType, identifier) // We explicitly use thread unsafe version and do locking ourself to ensure that // no new events will be processed in the meantime. The watchCache will be unlocked @@ -1194,9 +1197,13 @@ type cacheWatcher struct { allowWatchBookmarks bool // Object type of the cache watcher interests objectType reflect.Type + + // human readable identifier that helps assigning cacheWatcher + // instance with request + identifier string } -func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type) *cacheWatcher { +func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type, identifier string) *cacheWatcher { return &cacheWatcher{ input: make(chan *watchCacheEvent, chanSize), result: make(chan watch.Event, chanSize), @@ -1208,6 +1215,7 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), ve deadline: deadline, allowWatchBookmarks: allowWatchBookmarks, objectType: objectType, + identifier: identifier, } } @@ -1250,7 +1258,7 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { // This means that we couldn't send event to that watcher. // Since we don't want to block on it infinitely, // we simply terminate it. - klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String()) + klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v", c.objectType.String(), c.identifier) terminatedWatchersCounter.WithLabelValues(c.objectType.String()).Inc() c.forget() } @@ -1402,7 +1410,7 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven } processingTime := time.Since(startTime) if processingTime > initProcessThreshold { - klog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime) + klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime) } defer close(c.result) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 13deb0ee591..f3a6c08634b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -70,7 +70,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. - w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType) + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") go w.process(context.Background(), initEvents, 0) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { @@ -190,7 +190,7 @@ TestCase: testCase.events[j].ResourceVersion = uint64(j) + 1 } - w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType) + w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") go w.process(context.Background(), testCase.events, 0) ch := w.ResultChan() @@ -527,7 +527,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { // timeout to zero and run the Stop goroutine concurrently. // May sure that the watch will not be blocked on Stop. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType) + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") go w.Stop() select { case <-done: @@ -539,7 +539,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { deadline := time.Now().Add(time.Hour) // After that, verifies the cacheWatcher.process goroutine works correctly. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType) + w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "") w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} ctx, _ := context.WithDeadline(context.Background(), deadline) go w.process(ctx, nil, 0) @@ -598,7 +598,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) { forget := func() {} newWatcher := func(deadline time.Time) *cacheWatcher { - return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, objectType) + return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, objectType, "") } clock := clock.NewFakeClock(time.Now())