cacher: replace usable lock with conditional variable

This commit is contained in:
Xiang Li 2016-06-05 08:08:31 -07:00
parent 11211a49e3
commit aa472ff734

View File

@ -72,19 +72,13 @@ type CacherConfig struct {
type Cacher struct {
sync.RWMutex
// Each user-facing method that is not simply redirected to the underlying
// storage has to read-lock on this mutex before starting any processing.
// Before accessing the cacher's cache, wait for the ready to be ok.
// This is necessary to prevent users from accessing structures that are
// uninitialized or are being repopulated right now.
// NOTE: We cannot easily reuse the main mutex for it due to multi-threaded
// interactions of Cacher with the underlying WatchCache. Since Cacher is
// caling WatchCache directly and WatchCache is calling Cacher methods
// via its OnEvent and OnReplace hooks, we explicitly assume that if mutexes
// of both structures are held, the one from WatchCache is acquired first
// to avoid deadlocks. Unfortunately, forcing this rule in startCaching
// would be very difficult and introducing one more mutex seems to be much
// easier.
usable sync.RWMutex
// ready needs to be set to false when the cacher is paused or stopped.
// ready needs to be set to true when the cacher is ready to use after
// initialization.
ready *ready
// Underlying storage.Interface.
storage Interface
@ -126,6 +120,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
}
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
@ -139,8 +134,6 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: make(chan struct{}),
}
// See startCaching method for explanation and where this is unlocked.
cacher.usable.Lock()
watchCache.SetOnEvent(cacher.processEvent)
stopCh := cacher.stopCh
@ -168,11 +161,11 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
successfulList := false
c.watchCache.SetOnReplace(func() {
successfulList = true
c.usable.Unlock()
c.ready.set(true)
})
defer func() {
if successfulList {
c.usable.Lock()
c.ready.set(false)
}
}()
@ -213,9 +206,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
return nil, err
}
// Do NOT allow Watch to start when the underlying structures are not propagated.
c.usable.RLock()
defer c.usable.RUnlock()
c.ready.wait()
// We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked
@ -272,13 +263,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f
return err
}
// To avoid situation when List is processed before the underlying
// watchCache is propagated for the first time, we acquire and immediately
// release the 'usable' lock.
// We don't need to hold it all the time, because watchCache is thread-safe
// and it would complicate already very difficult locking pattern.
c.usable.RLock()
c.usable.RUnlock()
c.ready.wait()
// List elements from cache, with at least 'listRV'.
listPtr, err := meta.GetItemsPtr(listObj)
@ -381,18 +366,13 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi
// Returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
// To avoid situation when LastSyncResourceVersion is processed before the
// underlying watchCache is propagated, we acquire 'usable' lock.
c.usable.RLock()
defer c.usable.RUnlock()
c.RLock()
defer c.RUnlock()
c.ready.wait()
resourceVersion := c.reflector.LastSyncResourceVersion()
if resourceVersion == "" {
return 0, nil
}
return strconv.ParseUint(resourceVersion, 10, 64)
}
@ -591,3 +571,27 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin
}
}
}
type ready struct {
ok bool
c *sync.Cond
}
func newReady() *ready {
return &ready{c: sync.NewCond(&sync.Mutex{})}
}
func (r *ready) wait() {
r.c.L.Lock()
for !r.ok {
r.c.Wait()
}
r.c.L.Unlock()
}
func (r *ready) set(ok bool) {
r.c.L.Lock()
defer r.c.L.Unlock()
r.ok = ok
r.c.Broadcast()
}