From 2298e1746c3d776ea0c987bbe26e675458ec5597 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 13 Oct 2016 15:20:05 +0200 Subject: [PATCH] Increase buffer sizes in cacher for watchers interested in all/many objects. --- pkg/storage/cacher.go | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 26581972609..2cd9d4483ec 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -310,10 +310,25 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, triggerValue, triggerSupported = matchValues[0].Value, true } + // If there is triggerFunc defined, but triggerSupported is false, + // we can't narrow the amount of events significantly at this point. + // + // That said, currently triggerFunc is defined only for Pods and Nodes, + // and there is only constant number of watchers for which triggerSupported + // is false (excluding those issues explicitly by users). + // Thus, to reduce the risk of those watchers blocking all watchers of a + // given resource in the system, we increase the sizes of buffers for them. + chanSize := 10 + if c.triggerFunc != nil && !triggerSupported { + // TODO: We should tune this value and ideally make it dependent on the + // number of objects of a given type and/or their churn. + chanSize = 1000 + } + c.Lock() defer c.Unlock() forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) - watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, pred), forget) + watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, c.keyFunc, pred), forget) c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) c.watcherIdx++ @@ -610,10 +625,10 @@ type cacheWatcher struct { forget func(bool) } -func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher { +func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher { watcher := &cacheWatcher{ - input: make(chan watchCacheEvent, 10), - result: make(chan watch.Event, 10), + input: make(chan watchCacheEvent, chanSize), + result: make(chan watch.Event, chanSize), filter: filter, stopped: false, forget: forget, @@ -728,7 +743,7 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin // We should understand what is blocking us in those cases (e.g. // is it lack of CPU, network, or sth else) and potentially // consider increase size of result buffer in those cases. - const initProcessThreshold = 50 * time.Millisecond + const initProcessThreshold = 100 * time.Millisecond startTime := time.Now() for _, event := range initEvents { c.sendWatchCacheEvent(event) @@ -739,7 +754,7 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin if len(initEvents) > 0 { objType = reflect.TypeOf(initEvents[0].Object).String() } - glog.V(2).Infof("processing %d initEvents of %stook %v", len(initEvents), objType, processingTime) + glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime) } defer close(c.result)