diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 94b710fb5f0..0f5c0848dae 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -98,14 +98,17 @@ func (wm watchersMap) addWatcher(w *cacheWatcher, number int) { wm[number] = w } -func (wm watchersMap) deleteWatcher(number int) { - delete(wm, number) +func (wm watchersMap) deleteWatcher(number int, done func(*cacheWatcher)) { + if watcher, ok := wm[number]; ok { + delete(wm, number) + done(watcher) + } } -func (wm watchersMap) terminateAll() { +func (wm watchersMap) terminateAll(done func(*cacheWatcher)) { for key, watcher := range wm { delete(wm, key) - watcher.stop() + done(watcher) } } @@ -125,24 +128,24 @@ func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, } } -func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) { +func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool, done func(*cacheWatcher)) { if supported { - i.valueWatchers[value].deleteWatcher(number) + i.valueWatchers[value].deleteWatcher(number, done) if len(i.valueWatchers[value]) == 0 { delete(i.valueWatchers, value) } } else { - i.allWatchers.deleteWatcher(number) + i.allWatchers.deleteWatcher(number, done) } } -func (i *indexedWatchers) terminateAll(objectType reflect.Type) { +func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cacheWatcher)) { if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 { klog.Warningf("Terminating all watchers from cacher %v", objectType) } - i.allWatchers.terminateAll() + i.allWatchers.terminateAll(done) for index, watchers := range i.valueWatchers { - watchers.terminateAll() + watchers.terminateAll(done) delete(i.valueWatchers, index) } } @@ -203,8 +206,19 @@ type Cacher struct { stopCh chan struct{} stopWg sync.WaitGroup - // Used to avoid unnecessary allocations in underlying watchers. + // timer is used to avoid unnecessary allocations in underlying watchers. timer *time.Timer + + // dispatching determines whether there is currently dispatching of + // any event in flight. + dispatching bool + // watchersBuffer is a list of watchers potentially interested in currently + // dispatched event. + watchersBuffer []*cacheWatcher + // watchersToStop is a list of watchers that were supposed to be stopped + // during current dispatching, but stopping was deferred to the end of + // dispatching that event to avoid race with closing channels in watchers. + watchersToStop []*cacheWatcher } // NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from @@ -650,19 +664,41 @@ func (c *Cacher) dispatchEvents() { } func (c *Cacher) dispatchEvent(event *watchCacheEvent) { + c.startDispatching(event) + + // Since add() can block, we explicitly add when cacher is unlocked. + for _, watcher := range c.watchersBuffer { + watcher.add(event, c.timer, c.dispatchTimeoutBudget) + } + + c.finishDispatching() +} + +// startDispatching chooses watchers potentially interested in a given event +// a marks dispatching as true. +func (c *Cacher) startDispatching(event *watchCacheEvent) { triggerValues, supported := c.triggerValues(event) c.Lock() defer c.Unlock() + + c.dispatching = true + // We are reusing the slice to avoid memory reallocations in every + // dispatchEvent() call. That may prevent Go GC from freeing items + // from previous phases that are sitting behind the current length + // of the slice, but there is only a limited number of those and the + // gain from avoiding memory allocations is much bigger. + c.watchersBuffer = c.watchersBuffer[:0] + // Iterate over "allWatchers" no matter what the trigger function is. for _, watcher := range c.watchers.allWatchers { - watcher.add(event, c.timer, c.dispatchTimeoutBudget) + c.watchersBuffer = append(c.watchersBuffer, watcher) } if supported { // Iterate over watchers interested in the given values of the trigger. for _, triggerValue := range triggerValues { for _, watcher := range c.watchers.valueWatchers[triggerValue] { - watcher.add(event, c.timer, c.dispatchTimeoutBudget) + c.watchersBuffer = append(c.watchersBuffer, watcher) } } } else { @@ -675,16 +711,38 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) { // Iterate over watchers interested in exact values for all values. for _, watchers := range c.watchers.valueWatchers { for _, watcher := range watchers { - watcher.add(event, c.timer, c.dispatchTimeoutBudget) + c.watchersBuffer = append(c.watchersBuffer, watcher) } } } } +// finishDispatching stops all the watchers that were supposed to be +// stopped in the meantime, but it was deferred to avoid closing input +// channels of watchers, as add() may still have writing to it. +// It also marks dispatching as false. +func (c *Cacher) finishDispatching() { + c.Lock() + defer c.Unlock() + c.dispatching = false + for _, watcher := range c.watchersToStop { + watcher.stop() + } + c.watchersToStop = c.watchersToStop[:0] +} + func (c *Cacher) terminateAllWatchers() { c.Lock() defer c.Unlock() - c.watchers.terminateAll(c.objectType) + c.watchers.terminateAll(c.objectType, c.stopWatcherThreadUnsafe) +} + +func (c *Cacher) stopWatcherThreadUnsafe(watcher *cacheWatcher) { + if c.dispatching { + c.watchersToStop = append(c.watchersToStop, watcher) + } else { + watcher.stop() + } } func (c *Cacher) isStopped() bool { @@ -710,20 +768,15 @@ func (c *Cacher) Stop() { c.stopWg.Wait() } -func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) { - return func(lock bool) { - if lock { - c.Lock() - defer c.Unlock() - } else { - // false is currently passed only if we are forcing watcher to close due - // to its unresponsiveness and blocking other watchers. - // TODO: Get this information in cleaner way. - klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String()) - } +func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func() { + return func() { + c.Lock() + defer c.Unlock() + // It's possible that the watcher is already not in the structure (e.g. in case of - // simultaneous Stop() and terminateAllWatchers(), but it doesn't break anything. - c.watchers.deleteWatcher(index, triggerValue, triggerSupported) + // simultaneous Stop() and terminateAllWatchers(), but it is safe to call stop() + // on a watcher multiple times. + c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherThreadUnsafe) } } @@ -822,11 +875,11 @@ type cacheWatcher struct { done chan struct{} filter filterWithAttrsFunc stopped bool - forget func(bool) + forget func() versioner storage.Versioner } -func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner storage.Versioner) *cacheWatcher { +func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher { watcher := &cacheWatcher{ input: make(chan *watchCacheEvent, chanSize), result: make(chan watch.Event, chanSize), @@ -847,8 +900,7 @@ func (c *cacheWatcher) ResultChan() <-chan watch.Event { // Implements watch.Interface. func (c *cacheWatcher) Stop() { - c.forget(true) - c.stop() + c.forget() } func (c *cacheWatcher) stop() { @@ -888,8 +940,8 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *ti // This means that we couldn't send event to that watcher. // Since we don't want to block on it infinitely, // we simply terminate it. - c.forget(false) - c.stop() + klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", reflect.TypeOf(event.Object).String()) + c.forget() } budget.returnUnused(timeout - time.Since(startTime)) @@ -982,11 +1034,7 @@ func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion ui defer close(c.result) defer c.Stop() - for { - event, ok := <-c.input - if !ok { - return - } + for event := range c.input { // only send events newer than resourceVersion if event.ResourceVersion > resourceVersion { c.sendWatchCacheEvent(event) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index e18726a637d..2de915eb5a9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -45,12 +45,17 @@ import ( // the writes to cacheWatcher.result channel is blocked. func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { var lock sync.RWMutex + var w *cacheWatcher count := 0 filter := func(string, labels.Set, fields.Set) bool { return true } - forget := func(bool) { + forget := func() { lock.Lock() defer lock.Unlock() count++ + // forget() has to stop the watcher, as only stopping the watcher + // triggers stopping the process() goroutine which we are in the + // end waiting for in this test. + w.stop() } initEvents := []*watchCacheEvent{ {Object: &v1.Pod{}}, @@ -58,7 +63,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. - w := newCacheWatcher(0, 0, initEvents, filter, forget, testVersioner{}) + w = newCacheWatcher(0, 0, initEvents, filter, forget, testVersioner{}) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { lock.RLock() @@ -73,7 +78,7 @@ func TestCacheWatcherHandlesFiltering(t *testing.T) { filter := func(_ string, _ labels.Set, field fields.Set) bool { return field["spec.nodeName"] == "host" } - forget := func(bool) {} + forget := func() {} testCases := []struct { events []*watchCacheEvent