From 3fb020b28d1fb386de923adcf5fbbea74b075cde Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 19 Feb 2016 17:45:02 -0800 Subject: [PATCH] Fix a locking bug in the cacher. --- pkg/storage/cacher.go | 29 +++++++++++++++++------------ pkg/storage/cacher_test.go | 16 ++++++++++++++++ 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index e2ef0b79a44..365e224e875 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -171,9 +171,8 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { stopCh: make(chan struct{}), stopWg: sync.WaitGroup{}, } + // See startCaching method for explanation and where this is unlocked. cacher.usable.Lock() - // See startCaching method for why explanation on it. - watchCache.SetOnReplace(func() { cacher.usable.Unlock() }) watchCache.SetOnEvent(cacher.processEvent) stopCh := cacher.stopCh @@ -192,16 +191,22 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { } func (c *Cacher) startCaching(stopChannel <-chan struct{}) { - // Whenever we enter startCaching method, usable mutex is held. - // We explicitly do NOT Unlock it in this method, because we do - // not want to allow any Watch/List methods not explicitly redirected - // to the underlying storage when the cache is being initialized. - // Once the underlying cache is propagated, onReplace handler will - // be called, which will do the usable.Unlock() as configured in - // NewCacher(). - // Note: the same behavior is also triggered every time we fall out of - // backend storage watch event window. - defer c.usable.Lock() + // The 'usable' lock is always 'RLock'able when it is safe to use the cache. + // It is safe to use the cache after a successful list until a disconnection. + // We start with usable (write) locked. The below OnReplace function will + // unlock it after a successful list. The below defer will then re-lock + // it when this function exits (always due to disconnection), only if + // we actually got a successful list. This cycle will repeat as needed. + successfulList := false + c.watchCache.SetOnReplace(func() { + successfulList = true + c.usable.Unlock() + }) + defer func() { + if successfulList { + c.usable.Lock() + } + }() c.terminateAllWatchers() // Note that since onReplace may be not called due to errors, we explicitly diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 36d87afd636..d15a17f06e4 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -17,6 +17,7 @@ limitations under the License. package storage_test import ( + "fmt" "reflect" "strconv" "testing" @@ -171,8 +172,23 @@ func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType } } +type injectListError struct { + errors int + storage.Interface +} + +func (self *injectListError) List(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error { + if self.errors > 0 { + self.errors-- + return fmt.Errorf("injected error") + } + return self.Interface.List(ctx, key, resourceVersion, filter, listObj) +} + func TestWatch(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) + // Inject one list error to make sure we test the relist case. + etcdStorage = &injectListError{errors: 1, Interface: etcdStorage} defer server.Terminate(t) cacher := newTestCacher(etcdStorage) defer cacher.Stop()