From 378cd81dbe7b239d56df466b85c1542839b6251e Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 8 Sep 2016 12:02:27 +0200 Subject: [PATCH] Split dispatching to watchers in Cacher into separate goroutine. --- pkg/storage/cacher.go | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index dcd88cbc44a..cacb34f6339 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -161,6 +161,9 @@ type Cacher struct { watcherIdx int watchers indexedWatchers + // Incoming events that should be dispatched to watchers. + incoming chan watchCacheEvent + // Handling graceful termination. stopLock sync.RWMutex stopped bool @@ -197,6 +200,8 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { allWatchers: make(map[int]*cacheWatcher), valueWatchers: make(map[string]watchersMap), }, + // TODO: Figure out the correct value for the buffer size. + incoming: make(chan watchCacheEvent, 100), // We need to (potentially) stop both: // - wait.Until go-routine // - reflector.ListAndWatch @@ -205,6 +210,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { stopCh: make(chan struct{}), } watchCache.SetOnEvent(cacher.processEvent) + go cacher.dispatchEvents() stopCh := cacher.stopCh cacher.stopWg.Add(1) @@ -403,14 +409,26 @@ func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) { return result, len(result) > 0 } -// TODO: Most probably splitting this method to a separate thread will visibily -// improve throughput of our watch machinery. So what we should do is to: -// - OnEvent handler simply put an element to channel -// - processEvent be another goroutine processing events from that channel -// Additionally, if we make this channel buffered, cacher will be more resistant -// to single watchers being slow - see cacheWatcher::add method. func (c *Cacher) processEvent(event watchCacheEvent) { - triggerValues, supported := c.triggerValues(&event) + c.incoming <- event +} + +func (c *Cacher) dispatchEvents() { + for { + select { + case event, ok := <-c.incoming: + if !ok { + return + } + c.dispatchEvent(&event) + case <-c.stopCh: + return + } + } +} + +func (c *Cacher) dispatchEvent(event *watchCacheEvent) { + triggerValues, supported := c.triggerValues(event) c.Lock() defer c.Unlock() @@ -614,10 +632,10 @@ func (c *cacheWatcher) stop() { var timerPool sync.Pool -func (c *cacheWatcher) add(event watchCacheEvent) { +func (c *cacheWatcher) add(event *watchCacheEvent) { // Try to send the event immediately, without blocking. select { - case c.input <- event: + case c.input <- *event: return default: } @@ -636,7 +654,7 @@ func (c *cacheWatcher) add(event watchCacheEvent) { defer timerPool.Put(t) select { - case c.input <- event: + case c.input <- *event: stopped := t.Stop() if !stopped { // Consume triggered (but not yet received) timer event