From aa472ff734d9261ac96a85c35b8bcec5e956a3dd Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 5 Jun 2016 08:08:31 -0700 Subject: [PATCH] cacher: replace usable lock with conditional variable --- pkg/storage/cacher.go | 68 +++++++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 10760d9965a..e258e5f1395 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -72,19 +72,13 @@ type CacherConfig struct { type Cacher struct { sync.RWMutex - // Each user-facing method that is not simply redirected to the underlying - // storage has to read-lock on this mutex before starting any processing. + // Before accessing the cacher's cache, wait for the ready to be ok. // This is necessary to prevent users from accessing structures that are // uninitialized or are being repopulated right now. - // NOTE: We cannot easily reuse the main mutex for it due to multi-threaded - // interactions of Cacher with the underlying WatchCache. Since Cacher is - // caling WatchCache directly and WatchCache is calling Cacher methods - // via its OnEvent and OnReplace hooks, we explicitly assume that if mutexes - // of both structures are held, the one from WatchCache is acquired first - // to avoid deadlocks. Unfortunately, forcing this rule in startCaching - // would be very difficult and introducing one more mutex seems to be much - // easier. - usable sync.RWMutex + // ready needs to be set to false when the cacher is paused or stopped. + // ready needs to be set to true when the cacher is ready to use after + // initialization. + ready *ready // Underlying storage.Interface. storage Interface @@ -126,6 +120,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { } cacher := &Cacher{ + ready: newReady(), storage: config.Storage, watchCache: watchCache, reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), @@ -139,8 +134,6 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { // So we will be simply closing the channel, and synchronizing on the WaitGroup. stopCh: make(chan struct{}), } - // See startCaching method for explanation and where this is unlocked. - cacher.usable.Lock() watchCache.SetOnEvent(cacher.processEvent) stopCh := cacher.stopCh @@ -168,11 +161,11 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { successfulList := false c.watchCache.SetOnReplace(func() { successfulList = true - c.usable.Unlock() + c.ready.set(true) }) defer func() { if successfulList { - c.usable.Lock() + c.ready.set(false) } }() @@ -213,9 +206,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, return nil, err } - // Do NOT allow Watch to start when the underlying structures are not propagated. - c.usable.RLock() - defer c.usable.RUnlock() + c.ready.wait() // We explicitly use thread unsafe version and do locking ourself to ensure that // no new events will be processed in the meantime. The watchCache will be unlocked @@ -272,13 +263,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f return err } - // To avoid situation when List is processed before the underlying - // watchCache is propagated for the first time, we acquire and immediately - // release the 'usable' lock. - // We don't need to hold it all the time, because watchCache is thread-safe - // and it would complicate already very difficult locking pattern. - c.usable.RLock() - c.usable.RUnlock() + c.ready.wait() // List elements from cache, with at least 'listRV'. listPtr, err := meta.GetItemsPtr(listObj) @@ -381,18 +366,13 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi // Returns resource version to which the underlying cache is synced. func (c *Cacher) LastSyncResourceVersion() (uint64, error) { - // To avoid situation when LastSyncResourceVersion is processed before the - // underlying watchCache is propagated, we acquire 'usable' lock. - c.usable.RLock() - defer c.usable.RUnlock() - - c.RLock() - defer c.RUnlock() + c.ready.wait() resourceVersion := c.reflector.LastSyncResourceVersion() if resourceVersion == "" { return 0, nil } + return strconv.ParseUint(resourceVersion, 10, 64) } @@ -591,3 +571,27 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin } } } + +type ready struct { + ok bool + c *sync.Cond +} + +func newReady() *ready { + return &ready{c: sync.NewCond(&sync.Mutex{})} +} + +func (r *ready) wait() { + r.c.L.Lock() + for !r.ok { + r.c.Wait() + } + r.c.L.Unlock() +} + +func (r *ready) set(ok bool) { + r.c.L.Lock() + defer r.c.L.Unlock() + r.ok = ok + r.c.Broadcast() +}