diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 032fce5d0a8..c3a8c7ae975 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -68,13 +68,23 @@ type CacherConfig struct { type Cacher struct { sync.RWMutex + // Each user-facing method that is not simply redirected to the underlying + // storage has to read-lock on this mutex before starting any processing. + // This is necessary to prevent users from accessing structures that are + // uninitialized or are being repopulated right now. + // NOTE: We cannot easily reuse the main mutex for it due to multi-threaded + // interactions of Cacher with the underlying WatchCache. Since Cacher is + // caling WatchCache directly and WatchCache is calling Cacher methods + // via its OnEvent and OnReplace hooks, we explicitly assume that if mutexes + // of both structures are held, the one from WatchCache is acquired first + // to avoid deadlocks. Unfortunately, forcing this rule in startCaching + // would be very difficult and introducing one more mutex seems to be much + // easier. + usable sync.RWMutex + // Underlying storage.Interface. storage Interface - // Whether Cacher is initialized. - initialized sync.WaitGroup - initOnce sync.Once - // "sliding window" of recent changes of objects and the current state. watchCache *cache.WatchCache reflector *cache.Reflector @@ -98,44 +108,44 @@ func NewCacher(config CacherConfig) *Cacher { listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) cacher := &Cacher{ - initialized: sync.WaitGroup{}, - storage: config.Storage, - watchCache: watchCache, - reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), - watcherIdx: 0, - watchers: make(map[int]*cacheWatcher), - versioner: config.Versioner, - keyFunc: config.KeyFunc, + usable: sync.RWMutex{}, + storage: config.Storage, + watchCache: watchCache, + reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), + watcherIdx: 0, + watchers: make(map[int]*cacheWatcher), + versioner: config.Versioner, + keyFunc: config.KeyFunc, } - cacher.initialized.Add(1) + cacher.usable.Lock() // See startCaching method for why explanation on it. - watchCache.SetOnReplace(func() { - cacher.initOnce.Do(func() { cacher.initialized.Done() }) - cacher.Unlock() - }) + watchCache.SetOnReplace(func() { cacher.usable.Unlock() }) watchCache.SetOnEvent(cacher.processEvent) stopCh := config.StopChannel go util.Until(func() { cacher.startCaching(stopCh) }, 0, stopCh) - cacher.initialized.Wait() return cacher } func (c *Cacher) startCaching(stopChannel <-chan struct{}) { - c.Lock() + // Whenever we enter startCaching method, usable mutex is held. + // We explicitly do NOT Unlock it in this method, because we do + // not want to allow any Watch/List methods not explicitly redirected + // to the underlying storage when the cache is being initialized. + // Once the underlying cache is propagated, onReplace handler will + // be called, which will do the usable.Unlock() as configured in + // NewCacher(). + // Note: the same behavior is also triggered every time we fall out of + // backend storage watch event window. + defer c.usable.Lock() + c.terminateAllWatchers() - // We explicitly do NOT Unlock() in this method. - // This is because we do not want to allow any WATCH/LIST methods before - // the cache is initialized. Once the underlying cache is propagated, - // onReplace handler will be called, which will do the Unlock() as - // configured in NewCacher(). - // Note: the same bahavior is also triggered every time we fall out of - // backen storage (e.g. etcd's) watch event window. // Note that since onReplace may be not called due to errors, we explicitly // need to retry it on errors under lock. for { - err := c.reflector.ListAndWatch(stopChannel) - if err == nil { + if err := c.reflector.ListAndWatch(stopChannel); err != nil { + glog.Errorf("unexpected ListAndWatch error: %v", err) + } else { break } } @@ -168,6 +178,10 @@ func (c *Cacher) Delete(key string, out runtime.Object) error { // Implements storage.Interface. func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { + // Do NOT allow Watch to start when the underlying structures are not propagated. + c.usable.RLock() + defer c.usable.RUnlock() + // We explicitly use thread unsafe version and do locking ourself to ensure that // no new events will be processed in the meantime. The watchCache will be unlocked // on return from this function. @@ -216,6 +230,10 @@ func (c *Cacher) List(key string, listObj runtime.Object) error { // TODO: We may consider changing to use ListFromMemory in the future, but this // requires wider discussion as an "api semantic change". func (c *Cacher) ListFromMemory(key string, listObj runtime.Object) error { + // Do NOT allow Watch to start when the underlying structures are not propagated. + c.usable.RLock() + defer c.usable.RUnlock() + listPtr, err := runtime.GetItemsPtr(listObj) if err != nil { return err @@ -263,6 +281,8 @@ func (c *Cacher) processEvent(event cache.WatchCacheEvent) { } func (c *Cacher) terminateAllWatchers() { + c.Lock() + defer c.Unlock() for key, watcher := range c.watchers { delete(c.watchers, key) watcher.stop()