Increase buffer sizes in cacher for watchers interested in all/many objects.

This commit is contained in:
Wojciech Tyczynski 2016-10-13 15:20:05 +02:00
parent aa485ce82c
commit 2298e1746c

View File

@ -310,10 +310,25 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
triggerValue, triggerSupported = matchValues[0].Value, true
}
// If there is triggerFunc defined, but triggerSupported is false,
// we can't narrow the amount of events significantly at this point.
//
// That said, currently triggerFunc is defined only for Pods and Nodes,
// and there is only constant number of watchers for which triggerSupported
// is false (excluding those issues explicitly by users).
// Thus, to reduce the risk of those watchers blocking all watchers of a
// given resource in the system, we increase the sizes of buffers for them.
chanSize := 10
if c.triggerFunc != nil && !triggerSupported {
// TODO: We should tune this value and ideally make it dependent on the
// number of objects of a given type and/or their churn.
chanSize = 1000
}
c.Lock()
defer c.Unlock()
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, pred), forget)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, c.keyFunc, pred), forget)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
@ -610,10 +625,10 @@ type cacheWatcher struct {
forget func(bool)
}
func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan watchCacheEvent, 10),
result: make(chan watch.Event, 10),
input: make(chan watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
filter: filter,
stopped: false,
forget: forget,
@ -728,7 +743,7 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin
// We should understand what is blocking us in those cases (e.g.
// is it lack of CPU, network, or sth else) and potentially
// consider increase size of result buffer in those cases.
const initProcessThreshold = 50 * time.Millisecond
const initProcessThreshold = 100 * time.Millisecond
startTime := time.Now()
for _, event := range initEvents {
c.sendWatchCacheEvent(event)
@ -739,7 +754,7 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin
if len(initEvents) > 0 {
objType = reflect.TypeOf(initEvents[0].Object).String()
}
glog.V(2).Infof("processing %d initEvents of %stook %v", len(initEvents), objType, processingTime)
glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
}
defer close(c.result)