diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index b004b50538c..a84778c3e25 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -532,17 +532,23 @@ func (c *Cacher) dispatchEvents() { func (c *Cacher) dispatchEvent(event *watchCacheEvent) { triggerValues, supported := c.triggerValues(event) + // TODO: For now we assume we have a given budget for dispatching + // a single event. We should consider changing to the approach with: + // - budget has upper bound at + // - we add to current timeout every second + timeout := time.Duration(250) * time.Millisecond + c.Lock() defer c.Unlock() // Iterate over "allWatchers" no matter what the trigger function is. for _, watcher := range c.watchers.allWatchers { - watcher.add(event) + watcher.add(event, &timeout) } if supported { // Iterate over watchers interested in the given values of the trigger. for _, triggerValue := range triggerValues { for _, watcher := range c.watchers.valueWatchers[triggerValue] { - watcher.add(event) + watcher.add(event, &timeout) } } } else { @@ -555,7 +561,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) { // Iterate over watchers interested in exact values for all values. for _, watchers := range c.watchers.valueWatchers { for _, watcher := range watchers { - watcher.add(event) + watcher.add(event, &timeout) } } } @@ -729,7 +735,7 @@ func (c *cacheWatcher) stop() { var timerPool sync.Pool -func (c *cacheWatcher) add(event *watchCacheEvent) { +func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) { // Try to send the event immediately, without blocking. select { case c.input <- *event: @@ -737,20 +743,16 @@ func (c *cacheWatcher) add(event *watchCacheEvent) { default: } - // OK, block sending, but only for up to 5 seconds. + // OK, block sending, but only for up to . // cacheWatcher.add is called very often, so arrange // to reuse timers instead of constantly allocating. - trace := util.NewTrace( - fmt.Sprintf("cacheWatcher %v: waiting for add (initial result size %v)", - reflect.TypeOf(event.Object).String(), len(c.result))) - defer trace.LogIfLong(50 * time.Millisecond) + startTime := time.Now() - const timeout = 5 * time.Second t, ok := timerPool.Get().(*time.Timer) if ok { - t.Reset(timeout) + t.Reset(*timeout) } else { - t = time.NewTimer(timeout) + t = time.NewTimer(*timeout) } defer timerPool.Put(t) @@ -769,6 +771,10 @@ func (c *cacheWatcher) add(event *watchCacheEvent) { c.forget(false) c.stop() } + + if *timeout = *timeout - time.Since(startTime); *timeout < 0 { + *timeout = 0 + } } // NOTE: sendWatchCacheEvent is assumed to not modify !!! diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 30d52dc1c99..86d806fa0d2 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -297,12 +297,15 @@ func TestWatcherTimeout(t *testing.T) { } startVersion := strconv.Itoa(int(initialVersion)) - // Create a watcher that will not be reading any result. - watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything) - if err != nil { - t.Fatalf("Unexpected error: %v", err) + // Create a number of watchers that will not be reading any result. + nonReadingWatchers := 50 + for i := 0; i < nonReadingWatchers; i++ { + watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer watcher.Stop() } - defer watcher.Stop() // Create a second watcher that will be reading result. readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything) @@ -311,11 +314,15 @@ func TestWatcherTimeout(t *testing.T) { } defer readingWatcher.Stop() + startTime := time.Now() for i := 1; i <= 22; i++ { pod := makeTestPod(strconv.Itoa(i)) _ = updatePod(t, etcdStorage, pod, nil) verifyWatchEvent(t, readingWatcher, watch.Added, pod) } + if time.Since(startTime) > time.Duration(250*nonReadingWatchers)*time.Millisecond { + t.Errorf("waiting for events took too long: %v", time.Since(startTime)) + } } func TestFiltering(t *testing.T) {