Merge pull request #75717 from wojtek-t/reduce_critical_sections

Reduce critical sections in cacher::Watch function
This commit is contained in:
Kubernetes Prow Robot 2019-03-26 18:11:02 -07:00 committed by GitHub
commit df9e66628c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 27 deletions

View File

@ -53,6 +53,7 @@ var (
},
[]string{"resource"},
)
emptyFunc = func() {}
)
func init() {
@ -339,21 +340,6 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
c.ready.wait()
// We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked
// on return from this function.
// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher(err), nil
}
triggerValue, triggerSupported := "", false
// TODO: Currently we assume that in a given Cacher object, any <predicate> that is
// passed here is aware of exactly the same trigger (at most one).
@ -377,6 +363,25 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
chanSize = 1000
}
// Create a watcher here to reduce memory allocations under lock,
// given that memory allocation may trigger GC and block the thread.
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner)
// We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked
// on return from this function.
// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher(err), nil
}
// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
@ -384,13 +389,16 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
watchRV = initEvents[len(initEvents)-1].ResourceVersion
}
c.Lock()
defer c.Unlock()
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterWithAttrsFunction(key, pred), forget, c.versioner)
func() {
c.Lock()
defer c.Unlock()
// Update watcher.forget function once we can compute it.
watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
}()
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
go watcher.process(initEvents, watchRV)
return watcher, nil
}
@ -879,8 +887,8 @@ type cacheWatcher struct {
versioner storage.Versioner
}
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher {
watcher := &cacheWatcher{
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher {
return &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
@ -889,8 +897,6 @@ func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCa
forget: forget,
versioner: versioner,
}
go watcher.process(initEvents, resourceVersion)
return watcher
}
// Implements watch.Interface.

View File

@ -63,7 +63,8 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
}
// set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked.
w = newCacheWatcher(0, 0, initEvents, filter, forget, testVersioner{})
w = newCacheWatcher(0, filter, forget, testVersioner{})
go w.process(initEvents, 0)
w.Stop()
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock()
@ -181,7 +182,8 @@ TestCase:
for j := range testCase.events {
testCase.events[j].ResourceVersion = uint64(j) + 1
}
w := newCacheWatcher(0, 0, testCase.events, filter, forget, testVersioner{})
w := newCacheWatcher(0, filter, forget, testVersioner{})
go w.process(testCase.events, 0)
ch := w.ResultChan()
for j, event := range testCase.expected {
e := <-ch