diff --git a/pkg/client/unversioned/cache/watch_cache.go b/pkg/client/unversioned/cache/watch_cache.go index 7a1aa7c1ce8..a40c83a7cd6 100644 --- a/pkg/client/unversioned/cache/watch_cache.go +++ b/pkg/client/unversioned/cache/watch_cache.go @@ -252,10 +252,7 @@ func (w *WatchCache) SetOnEvent(onEvent func(WatchCacheEvent)) { w.onEvent = onEvent } -func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]WatchCacheEvent, error) { - w.RLock() - defer w.RUnlock() - +func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]WatchCacheEvent, error) { size := w.endIndex - w.startIndex oldest := w.resourceVersion if size > 0 { @@ -277,3 +274,9 @@ func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]WatchCacheEven } return result, nil } + +func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]WatchCacheEvent, error) { + w.RLock() + defer w.RUnlock() + return w.GetAllEventsSinceThreadUnsafe(resourceVersion) +} diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index ad2896c4191..76ae5915d0e 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -136,13 +136,20 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { // Implements Watch (signature from storage.Interface). func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { - c.Lock() - defer c.Unlock() - - initEvents, err := c.watchCache.GetAllEventsSince(resourceVersion) + // We explicitly use thread unsafe version and do locking ourself to ensure that + // no new events will be processed in the meantime. The watchCache will be unlocked + // on return from this function. + // Note that we cannot do it under Cacher lock, to avoid a deadlock, since the + // underlying watchCache is calling processEvent under its lock. + c.watchCache.RLock() + defer c.watchCache.RUnlock() + initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(resourceVersion) if err != nil { return nil, err } + + c.Lock() + defer c.Unlock() watcher := newCacheWatcher(initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx)) c.watchers[c.watcherIdx] = watcher c.watcherIdx++