diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 512a5775bed..ce82292fea9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -186,6 +186,9 @@ type Cacher struct { stopped bool stopCh chan struct{} stopWg sync.WaitGroup + + // Used to avoid unnecessary allocations in underlying watchers. + timer *time.Timer } // NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from @@ -227,6 +230,7 @@ func NewCacherFromConfig(config Config) *Cacher { // and there are no guarantees on the order that they will stop. // So we will be simply closing the channel, and synchronizing on the WaitGroup. stopCh: stopCh, + timer: time.NewTimer(time.Duration(0)), } watchCache.SetOnEvent(cacher.processEvent) go cacher.dispatchEvents() @@ -242,6 +246,14 @@ func NewCacherFromConfig(config Config) *Cacher { }, time.Second, stopCh, ) }() + + // Ensure that timer is stopped. + if !cacher.timer.Stop() { + // Consume triggered (but not yet received) timer event + // so that future reuse does not get a spurious timeout. + <-cacher.timer.C + } + return cacher } @@ -621,13 +633,13 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) { defer c.Unlock() // Iterate over "allWatchers" no matter what the trigger function is. for _, watcher := range c.watchers.allWatchers { - watcher.add(event, c.dispatchTimeoutBudget) + watcher.add(event, c.timer, c.dispatchTimeoutBudget) } if supported { // Iterate over watchers interested in the given values of the trigger. for _, triggerValue := range triggerValues { for _, watcher := range c.watchers.valueWatchers[triggerValue] { - watcher.add(event, c.dispatchTimeoutBudget) + watcher.add(event, c.timer, c.dispatchTimeoutBudget) } } } else { @@ -640,7 +652,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) { // Iterate over watchers interested in exact values for all values. for _, watchers := range c.watchers.valueWatchers { for _, watcher := range watchers { - watcher.add(event, c.dispatchTimeoutBudget) + watcher.add(event, c.timer, c.dispatchTimeoutBudget) } } } @@ -826,9 +838,7 @@ func (c *cacheWatcher) stop() { } } -var timerPool sync.Pool - -func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) { +func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) { // Try to send the event immediately, without blocking. select { case c.input <- event: @@ -842,23 +852,16 @@ func (c *cacheWatcher) add(event *watchCacheEvent, budget *timeBudget) { startTime := time.Now() timeout := budget.takeAvailable() - t, ok := timerPool.Get().(*time.Timer) - if ok { - t.Reset(timeout) - } else { - t = time.NewTimer(timeout) - } - defer timerPool.Put(t) + timer.Reset(timeout) select { case c.input <- event: - stopped := t.Stop() - if !stopped { + if !timer.Stop() { // Consume triggered (but not yet received) timer event // so that future reuse does not get a spurious timeout. - <-t.C + <-timer.C } - case <-t.C: + case <-timer.C: // This means that we couldn't send event to that watcher. // Since we don't want to block on it infinitely, // we simply terminate it.