Merge pull request #13135 from wojtek-t/fix_cacher_deadlock

Fix deadlock in Cacher on etcd errors
This commit is contained in:
Filip Grzadkowski 2015-08-26 10:46:24 +02:00
commit 9f1d2af5fb

View File

@ -68,13 +68,23 @@ type CacherConfig struct {
type Cacher struct {
sync.RWMutex
// Each user-facing method that is not simply redirected to the underlying
// storage has to read-lock on this mutex before starting any processing.
// This is necessary to prevent users from accessing structures that are
// uninitialized or are being repopulated right now.
// NOTE: We cannot easily reuse the main mutex for it due to multi-threaded
// interactions of Cacher with the underlying WatchCache. Since Cacher is
// caling WatchCache directly and WatchCache is calling Cacher methods
// via its OnEvent and OnReplace hooks, we explicitly assume that if mutexes
// of both structures are held, the one from WatchCache is acquired first
// to avoid deadlocks. Unfortunately, forcing this rule in startCaching
// would be very difficult and introducing one more mutex seems to be much
// easier.
usable sync.RWMutex
// Underlying storage.Interface.
storage Interface
// Whether Cacher is initialized.
initialized sync.WaitGroup
initOnce sync.Once
// "sliding window" of recent changes of objects and the current state.
watchCache *cache.WatchCache
reflector *cache.Reflector
@ -98,7 +108,7 @@ func NewCacher(config CacherConfig) *Cacher {
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
cacher := &Cacher{
initialized: sync.WaitGroup{},
usable: sync.RWMutex{},
storage: config.Storage,
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
@ -107,35 +117,35 @@ func NewCacher(config CacherConfig) *Cacher {
versioner: config.Versioner,
keyFunc: config.KeyFunc,
}
cacher.initialized.Add(1)
cacher.usable.Lock()
// See startCaching method for why explanation on it.
watchCache.SetOnReplace(func() {
cacher.initOnce.Do(func() { cacher.initialized.Done() })
cacher.Unlock()
})
watchCache.SetOnReplace(func() { cacher.usable.Unlock() })
watchCache.SetOnEvent(cacher.processEvent)
stopCh := config.StopChannel
go util.Until(func() { cacher.startCaching(stopCh) }, 0, stopCh)
cacher.initialized.Wait()
return cacher
}
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
c.Lock()
// Whenever we enter startCaching method, usable mutex is held.
// We explicitly do NOT Unlock it in this method, because we do
// not want to allow any Watch/List methods not explicitly redirected
// to the underlying storage when the cache is being initialized.
// Once the underlying cache is propagated, onReplace handler will
// be called, which will do the usable.Unlock() as configured in
// NewCacher().
// Note: the same behavior is also triggered every time we fall out of
// backend storage watch event window.
defer c.usable.Lock()
c.terminateAllWatchers()
// We explicitly do NOT Unlock() in this method.
// This is because we do not want to allow any WATCH/LIST methods before
// the cache is initialized. Once the underlying cache is propagated,
// onReplace handler will be called, which will do the Unlock() as
// configured in NewCacher().
// Note: the same bahavior is also triggered every time we fall out of
// backen storage (e.g. etcd's) watch event window.
// Note that since onReplace may be not called due to errors, we explicitly
// need to retry it on errors under lock.
for {
err := c.reflector.ListAndWatch(stopChannel)
if err == nil {
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
glog.Errorf("unexpected ListAndWatch error: %v", err)
} else {
break
}
}
@ -168,6 +178,10 @@ func (c *Cacher) Delete(key string, out runtime.Object) error {
// Implements storage.Interface.
func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
// Do NOT allow Watch to start when the underlying structures are not propagated.
c.usable.RLock()
defer c.usable.RUnlock()
// We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked
// on return from this function.
@ -216,6 +230,10 @@ func (c *Cacher) List(key string, listObj runtime.Object) error {
// TODO: We may consider changing to use ListFromMemory in the future, but this
// requires wider discussion as an "api semantic change".
func (c *Cacher) ListFromMemory(key string, listObj runtime.Object) error {
// Do NOT allow Watch to start when the underlying structures are not propagated.
c.usable.RLock()
defer c.usable.RUnlock()
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil {
return err
@ -263,6 +281,8 @@ func (c *Cacher) processEvent(event cache.WatchCacheEvent) {
}
func (c *Cacher) terminateAllWatchers() {
c.Lock()
defer c.Unlock()
for key, watcher := range c.watchers {
delete(c.watchers, key)
watcher.stop()