mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-10 20:42:26 +00:00
Fix deadlock in the cacher
This commit is contained in:
parent
b5a4a548df
commit
90bf334fd1
11
pkg/client/unversioned/cache/watch_cache.go
vendored
11
pkg/client/unversioned/cache/watch_cache.go
vendored
@ -252,10 +252,7 @@ func (w *WatchCache) SetOnEvent(onEvent func(WatchCacheEvent)) {
|
|||||||
w.onEvent = onEvent
|
w.onEvent = onEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]WatchCacheEvent, error) {
|
func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]WatchCacheEvent, error) {
|
||||||
w.RLock()
|
|
||||||
defer w.RUnlock()
|
|
||||||
|
|
||||||
size := w.endIndex - w.startIndex
|
size := w.endIndex - w.startIndex
|
||||||
oldest := w.resourceVersion
|
oldest := w.resourceVersion
|
||||||
if size > 0 {
|
if size > 0 {
|
||||||
@ -277,3 +274,9 @@ func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]WatchCacheEven
|
|||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *WatchCache) GetAllEventsSince(resourceVersion uint64) ([]WatchCacheEvent, error) {
|
||||||
|
w.RLock()
|
||||||
|
defer w.RUnlock()
|
||||||
|
return w.GetAllEventsSinceThreadUnsafe(resourceVersion)
|
||||||
|
}
|
||||||
|
@ -136,13 +136,20 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
|
|||||||
|
|
||||||
// Implements Watch (signature from storage.Interface).
|
// Implements Watch (signature from storage.Interface).
|
||||||
func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
func (c *Cacher) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
||||||
c.Lock()
|
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
||||||
defer c.Unlock()
|
// no new events will be processed in the meantime. The watchCache will be unlocked
|
||||||
|
// on return from this function.
|
||||||
initEvents, err := c.watchCache.GetAllEventsSince(resourceVersion)
|
// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
|
||||||
|
// underlying watchCache is calling processEvent under its lock.
|
||||||
|
c.watchCache.RLock()
|
||||||
|
defer c.watchCache.RUnlock()
|
||||||
|
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
watcher := newCacheWatcher(initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
|
watcher := newCacheWatcher(initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
|
||||||
c.watchers[c.watcherIdx] = watcher
|
c.watchers[c.watcherIdx] = watcher
|
||||||
c.watcherIdx++
|
c.watcherIdx++
|
||||||
|
Loading…
Reference in New Issue
Block a user