From 5e493ab467472f38d7e78b19180bb6c7684170f0 Mon Sep 17 00:00:00 2001 From: "fansong.cfs" Date: Sun, 28 Apr 2019 21:06:59 +0800 Subject: [PATCH] delivery event non blocking firstly --- .../apiserver/pkg/storage/cacher/cacher.go | 73 +++++++++++++------ .../storage/cacher/cacher_whitebox_test.go | 72 ++++++++++++++++++ 2 files changed, 124 insertions(+), 21 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 0e462f800da..10425d9dc0d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -276,6 +276,8 @@ type Cacher struct { // watchersBuffer is a list of watchers potentially interested in currently // dispatched event. watchersBuffer []*cacheWatcher + // blockedWatchers is a list of watchers whose buffer is currently full. + blockedWatchers []*cacheWatcher // watchersToStop is a list of watchers that were supposed to be stopped // during current dispatching, but stopping was deferred to the end of // dispatching that event to avoid race with closing channels in watchers. @@ -789,13 +791,45 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) { // Watchers stopped after startDispatching will be delayed to finishDispatching, // Since add() can block, we explicitly add when cacher is unlocked. + // Dispatching event in nonblocking way first, which make faster watchers + // not be blocked by slower ones. if event.Type == watch.Bookmark { for _, watcher := range c.watchersBuffer { watcher.nonblockingAdd(event) } } else { + c.blockedWatchers = c.blockedWatchers[:0] for _, watcher := range c.watchersBuffer { - watcher.add(event, c.timer, c.dispatchTimeoutBudget) + if !watcher.nonblockingAdd(event) { + c.blockedWatchers = append(c.blockedWatchers, watcher) + } + } + + if len(c.blockedWatchers) > 0 { + // dispatchEvent is called very often, so arrange + // to reuse timers instead of constantly allocating. + startTime := time.Now() + timeout := c.dispatchTimeoutBudget.takeAvailable() + c.timer.Reset(timeout) + + // Make sure every watcher will try to send event without blocking first, + // even if the timer has already expired. + timer := c.timer + for _, watcher := range c.blockedWatchers { + if !watcher.add(event, timer) { + // fired, clean the timer by set it to nil. + timer = nil + } + } + + // Stop the timer if it is not fired + if timer != nil && !timer.Stop() { + // Consume triggered (but not yet received) timer event + // so that future reuse does not get a spurious timeout. + <-timer.C + } + + c.dispatchTimeoutBudget.returnUnused(timeout - time.Since(startTime)) } } } @@ -1078,7 +1112,6 @@ func (c *cacheWatcher) stop() { } func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { - // If we can't send it, don't block on it. select { case c.input <- event: return true @@ -1087,28 +1120,14 @@ func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { } } -func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) { +// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher) +func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { // Try to send the event immediately, without blocking. if c.nonblockingAdd(event) { - return + return true } - // OK, block sending, but only for up to . - // cacheWatcher.add is called very often, so arrange - // to reuse timers instead of constantly allocating. - startTime := time.Now() - timeout := budget.takeAvailable() - - timer.Reset(timeout) - - select { - case c.input <- event: - if !timer.Stop() { - // Consume triggered (but not yet received) timer event - // so that future reuse does not get a spurious timeout. - <-timer.C - } - case <-timer.C: + closeFunc := func() { // This means that we couldn't send event to that watcher. // Since we don't want to block on it infinitely, // we simply terminate it. @@ -1116,7 +1135,19 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *ti c.forget() } - budget.returnUnused(timeout - time.Since(startTime)) + if timer == nil { + closeFunc() + return false + } + + // OK, block sending, but only until timer fires. + select { + case c.input <- event: + return true + case <-timer.C: + closeFunc() + return false + } } func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 4e51a6979b5..1c9481649f0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -743,3 +743,75 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { wg.Wait() } } + +func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _ := newTestCacher(backingStorage, 1000) + defer cacher.Stop() + + // Wait until cacher is initialized. + cacher.ready.wait() + + // Ensure there is some budget for slowing down processing. + cacher.dispatchTimeoutBudget.returnUnused(50 * time.Millisecond) + + makePod := func(i int) *examplev1.Pod { + return &examplev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", 1000+i), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%d", 1000+i), + }, + } + } + if err := cacher.watchCache.Add(makePod(0)); err != nil { + t.Errorf("error: %v", err) + } + + totalPods := 50 + + // Create watcher that will be blocked. + w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + defer w1.Stop() + + // Create fast watcher and ensure it will get all objects. + w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + defer w2.Stop() + + // Now push a ton of object to cache. + for i := 1; i < totalPods; i++ { + cacher.watchCache.Add(makePod(i)) + } + + shouldContinue := true + eventsCount := 0 + for shouldContinue { + select { + case event, ok := <-w2.ResultChan(): + if !ok { + shouldContinue = false + break + } + // Ensure there is some budget for fast watcher after slower one is blocked. + cacher.dispatchTimeoutBudget.returnUnused(50 * time.Millisecond) + if event.Type == watch.Added { + eventsCount++ + if eventsCount == totalPods { + shouldContinue = false + } + } + case <-time.After(2 * time.Second): + shouldContinue = false + w2.Stop() + } + } + if eventsCount != totalPods { + t.Errorf("watcher is blocked by slower one (count: %d)", eventsCount) + } +}