Adjust watch channel sizes in watchcache

This commit is contained in:
Wojciech Tyczyński 2022-04-28 11:56:41 +02:00
parent b96a04df90
commit 0db5c05bdb
3 changed files with 186 additions and 15 deletions

View File

@ -480,21 +480,12 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
} }
} }
// If there is indexedTrigger defined, but triggerSupported is false, // It boils down to a tradeoff between:
// we can't narrow the amount of events significantly at this point. // - having it as small as possible to reduce memory usage
// // - having it large enough to ensure that watchers that need to process
// That said, currently indexedTrigger is defined only for couple resources: // a bunch of changes have enough buffer to avoid from blocking other
// Pods, Nodes, Secrets and ConfigMaps and there is only a constant // watchers on our watcher having a processing hiccup
// number of watchers for which triggerSupported is false (excluding those chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
// issued explicitly by users).
// Thus, to reduce the risk of those watchers blocking all watchers of a
// given resource in the system, we increase the sizes of buffers for them.
chanSize := 10
if c.indexedTrigger != nil && !triggerSupported {
// TODO: We should tune this value and ideally make it dependent on the
// number of objects of a given type and/or their churn.
chanSize = 1000
}
// Determine watch timeout('0' means deadline is not set, ignore checking) // Determine watch timeout('0' means deadline is not set, ignore checking)
deadline, _ := ctx.Deadline() deadline, _ := ctx.Deadline()

View File

@ -18,6 +18,7 @@ package cacher
import ( import (
"fmt" "fmt"
"math"
"reflect" "reflect"
"sort" "sort"
"sync" "sync"
@ -579,6 +580,59 @@ func (w *watchCache) Resync() error {
return nil return nil
} }
func (w *watchCache) currentCapacity() int {
w.Lock()
defer w.Unlock()
return w.capacity
}
const (
// minWatchChanSize is the min size of channels used by the watch.
// We keep that set to 10 for "backward compatibility" until we
// convince ourselves based on some metrics that decreasing is safe.
minWatchChanSize = 10
// maxWatchChanSizeWithIndexAndTriger is the max size of the channel
// used by the watch using the index and trigger selector.
maxWatchChanSizeWithIndexAndTrigger = 10
// maxWatchChanSizeWithIndexWithoutTrigger is the max size of the channel
// used by the watch using the index but without triggering selector.
// We keep that set to 1000 for "backward compatibility", until we
// convinced ourselves based on some metrics that decreasing is safe.
maxWatchChanSizeWithIndexWithoutTrigger = 1000
// maxWatchChanSizeWithoutIndex is the max size of the channel
// used by the watch not using the index.
// TODO(wojtek-t): Figure out if the value shouldn't be higher.
maxWatchChanSizeWithoutIndex = 100
)
func (w *watchCache) suggestedWatchChannelSize(indexExists, triggerUsed bool) int {
// To estimate the channel size we use a heuristic that a channel
// should roughly be able to keep one second of history.
// We don't have an exact data, but given we store updates from
// the last <eventFreshDuration>, we approach it by dividing the
// capacity by the length of the history window.
chanSize := int(math.Ceil(float64(w.currentCapacity()) / eventFreshDuration.Seconds()))
// Finally we adjust the size to avoid ending with too low or
// to large values.
if chanSize < minWatchChanSize {
chanSize = minWatchChanSize
}
var maxChanSize int
switch {
case indexExists && triggerUsed:
maxChanSize = maxWatchChanSizeWithIndexAndTrigger
case indexExists && !triggerUsed:
maxChanSize = maxWatchChanSizeWithIndexWithoutTrigger
case !indexExists:
maxChanSize = maxWatchChanSizeWithoutIndex
}
if chanSize > maxChanSize {
chanSize = maxChanSize
}
return chanSize
}
// isIndexValidLocked checks if a given index is still valid. // isIndexValidLocked checks if a given index is still valid.
// This assumes that the lock is held. // This assumes that the lock is held.
func (w *watchCache) isIndexValidLocked(index int) bool { func (w *watchCache) isIndexValidLocked(index int) bool {

View File

@ -866,6 +866,132 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
} }
} }
func TestSuggestedWatchChannelSize(t *testing.T) {
testCases := []struct {
name string
capacity int
indexExists bool
triggerUsed bool
expected int
}{
{
name: "capacity=100, indexExists, triggerUsed",
capacity: 100,
indexExists: true,
triggerUsed: true,
expected: 10,
},
{
name: "capacity=100, indexExists, !triggerUsed",
capacity: 100,
indexExists: true,
triggerUsed: false,
expected: 10,
},
{
name: "capacity=100, !indexExists",
capacity: 100,
indexExists: false,
triggerUsed: false,
expected: 10,
},
{
name: "capacity=750, indexExists, triggerUsed",
capacity: 750,
indexExists: true,
triggerUsed: true,
expected: 10,
},
{
name: "capacity=750, indexExists, !triggerUsed",
capacity: 750,
indexExists: true,
triggerUsed: false,
expected: 10,
},
{
name: "capacity=750, !indexExists",
capacity: 750,
indexExists: false,
triggerUsed: false,
expected: 10,
},
{
name: "capacity=7500, indexExists, triggerUsed",
capacity: 7500,
indexExists: true,
triggerUsed: true,
expected: 10,
},
{
name: "capacity=7500, indexExists, !triggerUsed",
capacity: 7500,
indexExists: true,
triggerUsed: false,
expected: 100,
},
{
name: "capacity=7500, !indexExists",
capacity: 7500,
indexExists: false,
triggerUsed: false,
expected: 100,
},
{
name: "capacity=75000, indexExists, triggerUsed",
capacity: 75000,
indexExists: true,
triggerUsed: true,
expected: 10,
},
{
name: "capacity=75000, indexExists, !triggerUsed",
capacity: 75000,
indexExists: true,
triggerUsed: false,
expected: 1000,
},
{
name: "capacity=75000, !indexExists",
capacity: 75000,
indexExists: false,
triggerUsed: false,
expected: 100,
},
{
name: "capacity=750000, indexExists, triggerUsed",
capacity: 750000,
indexExists: true,
triggerUsed: true,
expected: 10,
},
{
name: "capacity=750000, indexExists, !triggerUsed",
capacity: 750000,
indexExists: true,
triggerUsed: false,
expected: 1000,
},
{
name: "capacity=750000, !indexExists",
capacity: 750000,
indexExists: false,
triggerUsed: false,
expected: 100,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
store := newTestWatchCache(test.capacity, &cache.Indexers{})
got := store.suggestedWatchChannelSize(test.indexExists, test.triggerUsed)
if got != test.expected {
t.Errorf("unexpected channel size got: %v, expected: %v", got, test.expected)
}
})
}
}
func BenchmarkWatchCache_updateCache(b *testing.B) { func BenchmarkWatchCache_updateCache(b *testing.B) {
store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{}) store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{})
store.cache = store.cache[:0] store.cache = store.cache[:0]