diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index f246085545b..20acacc9a87 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -497,14 +497,39 @@ func (c *cacheWatcher) stop() { } } +var timerPool sync.Pool + func (c *cacheWatcher) add(event watchCacheEvent) { - t := time.NewTimer(5 * time.Second) - defer t.Stop() + // Try to send the event immediately, without blocking. select { case c.input <- event: + return + default: + } + + // OK, block sending, but only for up to 5 seconds. + // cacheWatcher.add is called very often, so arrange + // to reuse timers instead of constantly allocating. + const timeout = 5 * time.Second + t, ok := timerPool.Get().(*time.Timer) + if ok { + t.Reset(timeout) + } else { + t = time.NewTimer(timeout) + } + defer timerPool.Put(t) + + select { + case c.input <- event: + stopped := t.Stop() + if !stopped { + // Consume triggered (but not yet received) timer event + // so that future reuse does not get a spurious timeout. + <-t.C + } case <-t.C: // This means that we couldn't send event to that watcher. - // Since we don't want to blockin on it infinitely, + // Since we don't want to block on it infinitely, // we simply terminate it. c.forget(false) c.stop()