From 8910abfdf3d13faf2bf415dc1019006fb06cbc9f Mon Sep 17 00:00:00 2001 From: "fansong.cfs" Date: Thu, 21 Mar 2019 17:11:58 +0800 Subject: [PATCH] delivery event non blocking firstly --- .../apiserver/pkg/storage/cacher/cacher.go | 80 ++++++++++++++----- 1 file changed, 59 insertions(+), 21 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 0f5c0848dae..dbad37a3790 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -215,6 +215,8 @@ type Cacher struct { // watchersBuffer is a list of watchers potentially interested in currently // dispatched event. watchersBuffer []*cacheWatcher + // blockedWatchers is a list of watchers whose buffer is currently full. + blockedWatchers []*cacheWatcher // watchersToStop is a list of watchers that were supposed to be stopped // during current dispatching, but stopping was deferred to the end of // dispatching that event to avoid race with closing channels in watchers. @@ -667,8 +669,39 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) { c.startDispatching(event) // Since add() can block, we explicitly add when cacher is unlocked. + + // Dispatching event in nonblocking way first, which make faster watchers + // not be blocked by slower ones. + c.blockedWatchers = c.blockedWatchers[:0] for _, watcher := range c.watchersBuffer { - watcher.add(event, c.timer, c.dispatchTimeoutBudget) + if !watcher.nonblockingAdd(event) { + c.blockedWatchers = append(c.blockedWatchers, watcher) + } + } + + if len(c.blockedWatchers) > 0 { + // dispatchEvent is called very often, so arrange + // to reuse timers instead of constantly allocating. + startTime := time.Now() + timeout := c.dispatchTimeoutBudget.takeAvailable() + c.timer.Reset(timeout) + + // Make sure every watcher will try to send event without blocking first, + // even if the timer has already expired. + timer := c.timer + for _, watcher := range c.blockedWatchers { + if !watcher.add(event, timer) { + // No time left, clean the timer by set it to nil. + timer = nil + } + } + if !c.timer.Stop() { + // Consume triggered (but not yet received) timer event + // so that future reuse does not get a spurious timeout. + <-c.timer.C + } + + c.dispatchTimeoutBudget.returnUnused(timeout - time.Since(startTime)) } c.finishDispatching() @@ -913,30 +946,23 @@ func (c *cacheWatcher) stop() { } } -func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) { - // Try to send the event immediately, without blocking. +func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { select { case c.input <- event: - return + return true default: + return false + } +} + +// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher) +func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { + // Try to send the event immediately, without blocking. + if c.nonblockingAdd(event) { + return true } - // OK, block sending, but only for up to . - // cacheWatcher.add is called very often, so arrange - // to reuse timers instead of constantly allocating. - startTime := time.Now() - timeout := budget.takeAvailable() - - timer.Reset(timeout) - - select { - case c.input <- event: - if !timer.Stop() { - // Consume triggered (but not yet received) timer event - // so that future reuse does not get a spurious timeout. - <-timer.C - } - case <-timer.C: + closeFunc := func() { // This means that we couldn't send event to that watcher. // Since we don't want to block on it infinitely, // we simply terminate it. @@ -944,7 +970,19 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *ti c.forget() } - budget.returnUnused(timeout - time.Since(startTime)) + if timer == nil { + closeFunc() + return false + } + + // OK, block sending, but only until timer fires. + select { + case c.input <- event: + return true + case <-timer.C: + closeFunc() + return false + } } // NOTE: sendWatchCacheEvent is assumed to not modify !!!