From 0db5c05bdb8bbc510307a48cbade712583bb009e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 28 Apr 2022 11:56:41 +0200 Subject: [PATCH] Adjust watch channel sizes in watchcache --- .../apiserver/pkg/storage/cacher/cacher.go | 21 +-- .../pkg/storage/cacher/watch_cache.go | 54 ++++++++ .../pkg/storage/cacher/watch_cache_test.go | 126 ++++++++++++++++++ 3 files changed, 186 insertions(+), 15 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index a6f8f3eade4..040e7c599ce 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -480,21 +480,12 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions } } - // If there is indexedTrigger defined, but triggerSupported is false, - // we can't narrow the amount of events significantly at this point. - // - // That said, currently indexedTrigger is defined only for couple resources: - // Pods, Nodes, Secrets and ConfigMaps and there is only a constant - // number of watchers for which triggerSupported is false (excluding those - // issued explicitly by users). - // Thus, to reduce the risk of those watchers blocking all watchers of a - // given resource in the system, we increase the sizes of buffers for them. - chanSize := 10 - if c.indexedTrigger != nil && !triggerSupported { - // TODO: We should tune this value and ideally make it dependent on the - // number of objects of a given type and/or their churn. - chanSize = 1000 - } + // It boils down to a tradeoff between: + // - having it as small as possible to reduce memory usage + // - having it large enough to ensure that watchers that need to process + // a bunch of changes have enough buffer to avoid from blocking other + // watchers on our watcher having a processing hiccup + chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported) // Determine watch timeout('0' means deadline is not set, ignore checking) deadline, _ := ctx.Deadline() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 65e9fde6591..9deff40704e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -18,6 +18,7 @@ package cacher import ( "fmt" + "math" "reflect" "sort" "sync" @@ -579,6 +580,59 @@ func (w *watchCache) Resync() error { return nil } +func (w *watchCache) currentCapacity() int { + w.Lock() + defer w.Unlock() + return w.capacity +} + +const ( + // minWatchChanSize is the min size of channels used by the watch. + // We keep that set to 10 for "backward compatibility" until we + // convince ourselves based on some metrics that decreasing is safe. + minWatchChanSize = 10 + // maxWatchChanSizeWithIndexAndTriger is the max size of the channel + // used by the watch using the index and trigger selector. + maxWatchChanSizeWithIndexAndTrigger = 10 + // maxWatchChanSizeWithIndexWithoutTrigger is the max size of the channel + // used by the watch using the index but without triggering selector. + // We keep that set to 1000 for "backward compatibility", until we + // convinced ourselves based on some metrics that decreasing is safe. + maxWatchChanSizeWithIndexWithoutTrigger = 1000 + // maxWatchChanSizeWithoutIndex is the max size of the channel + // used by the watch not using the index. + // TODO(wojtek-t): Figure out if the value shouldn't be higher. + maxWatchChanSizeWithoutIndex = 100 +) + +func (w *watchCache) suggestedWatchChannelSize(indexExists, triggerUsed bool) int { + // To estimate the channel size we use a heuristic that a channel + // should roughly be able to keep one second of history. + // We don't have an exact data, but given we store updates from + // the last , we approach it by dividing the + // capacity by the length of the history window. + chanSize := int(math.Ceil(float64(w.currentCapacity()) / eventFreshDuration.Seconds())) + + // Finally we adjust the size to avoid ending with too low or + // to large values. + if chanSize < minWatchChanSize { + chanSize = minWatchChanSize + } + var maxChanSize int + switch { + case indexExists && triggerUsed: + maxChanSize = maxWatchChanSizeWithIndexAndTrigger + case indexExists && !triggerUsed: + maxChanSize = maxWatchChanSizeWithIndexWithoutTrigger + case !indexExists: + maxChanSize = maxWatchChanSizeWithoutIndex + } + if chanSize > maxChanSize { + chanSize = maxChanSize + } + return chanSize +} + // isIndexValidLocked checks if a given index is still valid. // This assumes that the lock is held. func (w *watchCache) isIndexValidLocked(index int) bool { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 6f31c6c0abf..df6f9c0088b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -866,6 +866,132 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { } } +func TestSuggestedWatchChannelSize(t *testing.T) { + testCases := []struct { + name string + capacity int + indexExists bool + triggerUsed bool + expected int + }{ + { + name: "capacity=100, indexExists, triggerUsed", + capacity: 100, + indexExists: true, + triggerUsed: true, + expected: 10, + }, + { + name: "capacity=100, indexExists, !triggerUsed", + capacity: 100, + indexExists: true, + triggerUsed: false, + expected: 10, + }, + { + name: "capacity=100, !indexExists", + capacity: 100, + indexExists: false, + triggerUsed: false, + expected: 10, + }, + { + name: "capacity=750, indexExists, triggerUsed", + capacity: 750, + indexExists: true, + triggerUsed: true, + expected: 10, + }, + { + name: "capacity=750, indexExists, !triggerUsed", + capacity: 750, + indexExists: true, + triggerUsed: false, + expected: 10, + }, + { + name: "capacity=750, !indexExists", + capacity: 750, + indexExists: false, + triggerUsed: false, + expected: 10, + }, + { + name: "capacity=7500, indexExists, triggerUsed", + capacity: 7500, + indexExists: true, + triggerUsed: true, + expected: 10, + }, + { + name: "capacity=7500, indexExists, !triggerUsed", + capacity: 7500, + indexExists: true, + triggerUsed: false, + expected: 100, + }, + { + name: "capacity=7500, !indexExists", + capacity: 7500, + indexExists: false, + triggerUsed: false, + expected: 100, + }, + { + name: "capacity=75000, indexExists, triggerUsed", + capacity: 75000, + indexExists: true, + triggerUsed: true, + expected: 10, + }, + { + name: "capacity=75000, indexExists, !triggerUsed", + capacity: 75000, + indexExists: true, + triggerUsed: false, + expected: 1000, + }, + { + name: "capacity=75000, !indexExists", + capacity: 75000, + indexExists: false, + triggerUsed: false, + expected: 100, + }, + { + name: "capacity=750000, indexExists, triggerUsed", + capacity: 750000, + indexExists: true, + triggerUsed: true, + expected: 10, + }, + { + name: "capacity=750000, indexExists, !triggerUsed", + capacity: 750000, + indexExists: true, + triggerUsed: false, + expected: 1000, + }, + { + name: "capacity=750000, !indexExists", + capacity: 750000, + indexExists: false, + triggerUsed: false, + expected: 100, + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + store := newTestWatchCache(test.capacity, &cache.Indexers{}) + got := store.suggestedWatchChannelSize(test.indexExists, test.triggerUsed) + if got != test.expected { + t.Errorf("unexpected channel size got: %v, expected: %v", got, test.expected) + } + }) + } +} + func BenchmarkWatchCache_updateCache(b *testing.B) { store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{}) store.cache = store.cache[:0]