diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 0f5c0848dae..2cd90d64a99 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -53,6 +53,7 @@ var ( }, []string{"resource"}, ) + emptyFunc = func() {} ) func init() { @@ -339,21 +340,6 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, c.ready.wait() - // We explicitly use thread unsafe version and do locking ourself to ensure that - // no new events will be processed in the meantime. The watchCache will be unlocked - // on return from this function. - // Note that we cannot do it under Cacher lock, to avoid a deadlock, since the - // underlying watchCache is calling processEvent under its lock. - c.watchCache.RLock() - defer c.watchCache.RUnlock() - initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV) - if err != nil { - // To match the uncached watch implementation, once we have passed authn/authz/admission, - // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, - // rather than a directly returned error. - return newErrWatcher(err), nil - } - triggerValue, triggerSupported := "", false // TODO: Currently we assume that in a given Cacher object, any that is // passed here is aware of exactly the same trigger (at most one). @@ -377,6 +363,25 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, chanSize = 1000 } + // Create a watcher here to reduce memory allocations under lock, + // given that memory allocation may trigger GC and block the thread. + watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner) + + // We explicitly use thread unsafe version and do locking ourself to ensure that + // no new events will be processed in the meantime. The watchCache will be unlocked + // on return from this function. + // Note that we cannot do it under Cacher lock, to avoid a deadlock, since the + // underlying watchCache is calling processEvent under its lock. + c.watchCache.RLock() + defer c.watchCache.RUnlock() + initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV) + if err != nil { + // To match the uncached watch implementation, once we have passed authn/authz/admission, + // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, + // rather than a directly returned error. + return newErrWatcher(err), nil + } + // With some events already sent, update resourceVersion so that // events that were buffered and not yet processed won't be delivered // to this watcher second time causing going back in time. @@ -384,13 +389,16 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, watchRV = initEvents[len(initEvents)-1].ResourceVersion } - c.Lock() - defer c.Unlock() - forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) - watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterWithAttrsFunction(key, pred), forget, c.versioner) + func() { + c.Lock() + defer c.Unlock() + // Update watcher.forget function once we can compute it. + watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) + c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) + c.watcherIdx++ + }() - c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) - c.watcherIdx++ + go watcher.process(initEvents, watchRV) return watcher, nil } @@ -879,8 +887,8 @@ type cacheWatcher struct { versioner storage.Versioner } -func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher { - watcher := &cacheWatcher{ +func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher { + return &cacheWatcher{ input: make(chan *watchCacheEvent, chanSize), result: make(chan watch.Event, chanSize), done: make(chan struct{}), @@ -889,8 +897,6 @@ func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCa forget: forget, versioner: versioner, } - go watcher.process(initEvents, resourceVersion) - return watcher } // Implements watch.Interface. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 2de915eb5a9..f102095f05d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -63,7 +63,8 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. - w = newCacheWatcher(0, 0, initEvents, filter, forget, testVersioner{}) + w = newCacheWatcher(0, filter, forget, testVersioner{}) + go w.process(initEvents, 0) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { lock.RLock() @@ -181,7 +182,8 @@ TestCase: for j := range testCase.events { testCase.events[j].ResourceVersion = uint64(j) + 1 } - w := newCacheWatcher(0, 0, testCase.events, filter, forget, testVersioner{}) + w := newCacheWatcher(0, filter, forget, testVersioner{}) + go w.process(testCase.events, 0) ch := w.ResultChan() for j, event := range testCase.expected { e := <-ch