mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Merge pull request #26860 from xiang90/cvar
Automatic merge from submit-queue cacher: replace usable lock with conditional variable Perviously we use a rwlock to indicate the ready information of the cacher. I feel it is not straightforward. Also it requires a few comments to explain. The pull request tries to replace the lock with a conditional variable for readability reason. /cc @lavalamp @wojtek-t
This commit is contained in:
commit
ef3f2fb157
@ -72,19 +72,13 @@ type CacherConfig struct {
|
||||
type Cacher struct {
|
||||
sync.RWMutex
|
||||
|
||||
// Each user-facing method that is not simply redirected to the underlying
|
||||
// storage has to read-lock on this mutex before starting any processing.
|
||||
// Before accessing the cacher's cache, wait for the ready to be ok.
|
||||
// This is necessary to prevent users from accessing structures that are
|
||||
// uninitialized or are being repopulated right now.
|
||||
// NOTE: We cannot easily reuse the main mutex for it due to multi-threaded
|
||||
// interactions of Cacher with the underlying WatchCache. Since Cacher is
|
||||
// caling WatchCache directly and WatchCache is calling Cacher methods
|
||||
// via its OnEvent and OnReplace hooks, we explicitly assume that if mutexes
|
||||
// of both structures are held, the one from WatchCache is acquired first
|
||||
// to avoid deadlocks. Unfortunately, forcing this rule in startCaching
|
||||
// would be very difficult and introducing one more mutex seems to be much
|
||||
// easier.
|
||||
usable sync.RWMutex
|
||||
// ready needs to be set to false when the cacher is paused or stopped.
|
||||
// ready needs to be set to true when the cacher is ready to use after
|
||||
// initialization.
|
||||
ready *ready
|
||||
|
||||
// Underlying storage.Interface.
|
||||
storage Interface
|
||||
@ -126,6 +120,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
|
||||
}
|
||||
|
||||
cacher := &Cacher{
|
||||
ready: newReady(),
|
||||
storage: config.Storage,
|
||||
watchCache: watchCache,
|
||||
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
|
||||
@ -139,8 +134,6 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
|
||||
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
// See startCaching method for explanation and where this is unlocked.
|
||||
cacher.usable.Lock()
|
||||
watchCache.SetOnEvent(cacher.processEvent)
|
||||
|
||||
stopCh := cacher.stopCh
|
||||
@ -168,11 +161,11 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
|
||||
successfulList := false
|
||||
c.watchCache.SetOnReplace(func() {
|
||||
successfulList = true
|
||||
c.usable.Unlock()
|
||||
c.ready.set(true)
|
||||
})
|
||||
defer func() {
|
||||
if successfulList {
|
||||
c.usable.Lock()
|
||||
c.ready.set(false)
|
||||
}
|
||||
}()
|
||||
|
||||
@ -213,9 +206,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Do NOT allow Watch to start when the underlying structures are not propagated.
|
||||
c.usable.RLock()
|
||||
defer c.usable.RUnlock()
|
||||
c.ready.wait()
|
||||
|
||||
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
||||
// no new events will be processed in the meantime. The watchCache will be unlocked
|
||||
@ -272,13 +263,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f
|
||||
return err
|
||||
}
|
||||
|
||||
// To avoid situation when List is processed before the underlying
|
||||
// watchCache is propagated for the first time, we acquire and immediately
|
||||
// release the 'usable' lock.
|
||||
// We don't need to hold it all the time, because watchCache is thread-safe
|
||||
// and it would complicate already very difficult locking pattern.
|
||||
c.usable.RLock()
|
||||
c.usable.RUnlock()
|
||||
c.ready.wait()
|
||||
|
||||
// List elements from cache, with at least 'listRV'.
|
||||
listPtr, err := meta.GetItemsPtr(listObj)
|
||||
@ -381,18 +366,13 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi
|
||||
|
||||
// Returns resource version to which the underlying cache is synced.
|
||||
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
|
||||
// To avoid situation when LastSyncResourceVersion is processed before the
|
||||
// underlying watchCache is propagated, we acquire 'usable' lock.
|
||||
c.usable.RLock()
|
||||
defer c.usable.RUnlock()
|
||||
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
c.ready.wait()
|
||||
|
||||
resourceVersion := c.reflector.LastSyncResourceVersion()
|
||||
if resourceVersion == "" {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
return strconv.ParseUint(resourceVersion, 10, 64)
|
||||
}
|
||||
|
||||
@ -591,3 +571,27 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type ready struct {
|
||||
ok bool
|
||||
c *sync.Cond
|
||||
}
|
||||
|
||||
func newReady() *ready {
|
||||
return &ready{c: sync.NewCond(&sync.Mutex{})}
|
||||
}
|
||||
|
||||
func (r *ready) wait() {
|
||||
r.c.L.Lock()
|
||||
for !r.ok {
|
||||
r.c.Wait()
|
||||
}
|
||||
r.c.L.Unlock()
|
||||
}
|
||||
|
||||
func (r *ready) set(ok bool) {
|
||||
r.c.L.Lock()
|
||||
defer r.c.L.Unlock()
|
||||
r.ok = ok
|
||||
r.c.Broadcast()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user