Merge pull request #109708 from wojtek-t/adjustable_watch_channel_size

Adjust watch channel sizes in watchcache
This commit is contained in:
Kubernetes Prow Robot 2022-07-21 05:03:54 -07:00 committed by GitHub
commit 80c2a0fe39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 186 additions and 15 deletions

View File

@ -480,21 +480,12 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
}
}
// If there is indexedTrigger defined, but triggerSupported is false,
// we can't narrow the amount of events significantly at this point.
//
// That said, currently indexedTrigger is defined only for couple resources:
// Pods, Nodes, Secrets and ConfigMaps and there is only a constant
// number of watchers for which triggerSupported is false (excluding those
// issued explicitly by users).
// Thus, to reduce the risk of those watchers blocking all watchers of a
// given resource in the system, we increase the sizes of buffers for them.
chanSize := 10
if c.indexedTrigger != nil && !triggerSupported {
// TODO: We should tune this value and ideally make it dependent on the
// number of objects of a given type and/or their churn.
chanSize = 1000
}
// It boils down to a tradeoff between:
// - having it as small as possible to reduce memory usage
// - having it large enough to ensure that watchers that need to process
// a bunch of changes have enough buffer to avoid from blocking other
// watchers on our watcher having a processing hiccup
chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
// Determine watch timeout('0' means deadline is not set, ignore checking)
deadline, _ := ctx.Deadline()

View File

@ -18,6 +18,7 @@ package cacher
import (
"fmt"
"math"
"reflect"
"sort"
"sync"
@ -579,6 +580,59 @@ func (w *watchCache) Resync() error {
return nil
}
func (w *watchCache) currentCapacity() int {
w.Lock()
defer w.Unlock()
return w.capacity
}
const (
// minWatchChanSize is the min size of channels used by the watch.
// We keep that set to 10 for "backward compatibility" until we
// convince ourselves based on some metrics that decreasing is safe.
minWatchChanSize = 10
// maxWatchChanSizeWithIndexAndTriger is the max size of the channel
// used by the watch using the index and trigger selector.
maxWatchChanSizeWithIndexAndTrigger = 10
// maxWatchChanSizeWithIndexWithoutTrigger is the max size of the channel
// used by the watch using the index but without triggering selector.
// We keep that set to 1000 for "backward compatibility", until we
// convinced ourselves based on some metrics that decreasing is safe.
maxWatchChanSizeWithIndexWithoutTrigger = 1000
// maxWatchChanSizeWithoutIndex is the max size of the channel
// used by the watch not using the index.
// TODO(wojtek-t): Figure out if the value shouldn't be higher.
maxWatchChanSizeWithoutIndex = 100
)
func (w *watchCache) suggestedWatchChannelSize(indexExists, triggerUsed bool) int {
// To estimate the channel size we use a heuristic that a channel
// should roughly be able to keep one second of history.
// We don't have an exact data, but given we store updates from
// the last <eventFreshDuration>, we approach it by dividing the
// capacity by the length of the history window.
chanSize := int(math.Ceil(float64(w.currentCapacity()) / eventFreshDuration.Seconds()))
// Finally we adjust the size to avoid ending with too low or
// to large values.
if chanSize < minWatchChanSize {
chanSize = minWatchChanSize
}
var maxChanSize int
switch {
case indexExists && triggerUsed:
maxChanSize = maxWatchChanSizeWithIndexAndTrigger
case indexExists && !triggerUsed:
maxChanSize = maxWatchChanSizeWithIndexWithoutTrigger
case !indexExists:
maxChanSize = maxWatchChanSizeWithoutIndex
}
if chanSize > maxChanSize {
chanSize = maxChanSize
}
return chanSize
}
// isIndexValidLocked checks if a given index is still valid.
// This assumes that the lock is held.
func (w *watchCache) isIndexValidLocked(index int) bool {

View File

@ -866,6 +866,132 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
}
}
func TestSuggestedWatchChannelSize(t *testing.T) {
testCases := []struct {
name string
capacity int
indexExists bool
triggerUsed bool
expected int
}{
{
name: "capacity=100, indexExists, triggerUsed",
capacity: 100,
indexExists: true,
triggerUsed: true,
expected: 10,
},
{
name: "capacity=100, indexExists, !triggerUsed",
capacity: 100,
indexExists: true,
triggerUsed: false,
expected: 10,
},
{
name: "capacity=100, !indexExists",
capacity: 100,
indexExists: false,
triggerUsed: false,
expected: 10,
},
{
name: "capacity=750, indexExists, triggerUsed",
capacity: 750,
indexExists: true,
triggerUsed: true,
expected: 10,
},
{
name: "capacity=750, indexExists, !triggerUsed",
capacity: 750,
indexExists: true,
triggerUsed: false,
expected: 10,
},
{
name: "capacity=750, !indexExists",
capacity: 750,
indexExists: false,
triggerUsed: false,
expected: 10,
},
{
name: "capacity=7500, indexExists, triggerUsed",
capacity: 7500,
indexExists: true,
triggerUsed: true,
expected: 10,
},
{
name: "capacity=7500, indexExists, !triggerUsed",
capacity: 7500,
indexExists: true,
triggerUsed: false,
expected: 100,
},
{
name: "capacity=7500, !indexExists",
capacity: 7500,
indexExists: false,
triggerUsed: false,
expected: 100,
},
{
name: "capacity=75000, indexExists, triggerUsed",
capacity: 75000,
indexExists: true,
triggerUsed: true,
expected: 10,
},
{
name: "capacity=75000, indexExists, !triggerUsed",
capacity: 75000,
indexExists: true,
triggerUsed: false,
expected: 1000,
},
{
name: "capacity=75000, !indexExists",
capacity: 75000,
indexExists: false,
triggerUsed: false,
expected: 100,
},
{
name: "capacity=750000, indexExists, triggerUsed",
capacity: 750000,
indexExists: true,
triggerUsed: true,
expected: 10,
},
{
name: "capacity=750000, indexExists, !triggerUsed",
capacity: 750000,
indexExists: true,
triggerUsed: false,
expected: 1000,
},
{
name: "capacity=750000, !indexExists",
capacity: 750000,
indexExists: false,
triggerUsed: false,
expected: 100,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
store := newTestWatchCache(test.capacity, &cache.Indexers{})
got := store.suggestedWatchChannelSize(test.indexExists, test.triggerUsed)
if got != test.expected {
t.Errorf("unexpected channel size got: %v, expected: %v", got, test.expected)
}
})
}
}
func BenchmarkWatchCache_updateCache(b *testing.B) {
store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{})
store.cache = store.cache[:0]